Skip to content

Commit

Permalink
Update to 2.0.16
Browse files Browse the repository at this point in the history
  • Loading branch information
mango-dee committed Nov 20, 2024
1 parent f18f5a6 commit ce03864
Show file tree
Hide file tree
Showing 9 changed files with 1,219 additions and 1,096 deletions.
2,124 changes: 1,165 additions & 959 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "geyser-grpc-connector"
version = "0.10.1+yellowstone.1.12"
version = "2.0.0"
edition = "2021"

description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming"
Expand All @@ -9,12 +9,12 @@ authors = ["GroovieGermanikus <[email protected]>"]
repository = "https://github.com/blockworks-foundation/geyser-grpc-connector"

[dependencies]
yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-client = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" }
yellowstone-grpc-proto = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" }


# required for CommitmentConfig
solana-sdk = "~1.17.15"
solana-sdk = "2.0.16"

url = "2.5.0"
async-stream = "0.3.5"
Expand All @@ -32,6 +32,10 @@ bincode = "1.3.3"

csv = "1.3.0"

[dev-dependencies]
#[dev-dependencies]
tracing-subscriber = "0.3.16"
solana-logger = "1"
solana-logger = "2.0.16"

[patch.crates-io.curve25519-dalek]
git = "https://github.com/anza-xyz/curve25519-dalek.git"
rev = "b500cdc2a920cd5bff9e2dd974d7b97349d61464"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ These are the currently maintained versions of the library: [see Wiki](https://g
```cargo add geyser-grpc-connector ```


An example how to use the library is provided in `stream_blocks_mainnet.rs`.
An example how to use the library is provided in `stream_blocks_mainnet_stream.rs`.

## Known issues
* Library does not support other data than Blocks/Slots very well.
Expand Down
40 changes: 6 additions & 34 deletions examples/stream_blocks_mainnet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use base64::Engine;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::{borsh1, compute_budget};
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl FromYellowstoneExtractor for BlockMetaExtractor {
}
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
Expand Down Expand Up @@ -298,32 +298,6 @@ pub fn map_produced_block(
.collect(),

Check warning on line 298 in examples/stream_blocks_mainnet_stream.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet_stream.rs
});

let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
}
None
});

let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);

let cu_requested = message
.instructions()
.iter()
Expand All @@ -332,14 +306,13 @@ pub fn map_produced_block(
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
borsh1::try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
});

let prioritization_fees = message
.instructions()
Expand All @@ -349,15 +322,14 @@ pub fn map_produced_block(
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
borsh1::try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}

None
})
.or(legacy_prioritization_fees);
});

Some(TransactionInfo {
signature: signature.to_string(),
Expand Down
38 changes: 5 additions & 33 deletions examples/stream_blocks_mainnet_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use base64::Engine;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::{borsh1, compute_budget};
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
Expand Down Expand Up @@ -268,32 +268,6 @@ pub fn map_produced_block(
.collect(),

Check warning on line 268 in examples/stream_blocks_mainnet_task.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet_task.rs
});

let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
}
None
});

let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);

let cu_requested = message
.instructions()
.iter()
Expand All @@ -302,14 +276,13 @@ pub fn map_produced_block(
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
borsh1::try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
});

let prioritization_fees = message
.instructions()
Expand All @@ -319,15 +292,14 @@ pub fn map_produced_block(
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
borsh1::try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}

None
})
.or(legacy_prioritization_fees);
});

Some(TransactionInfo {
signature: signature.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.73.0"
channel = "1.81.0"
20 changes: 9 additions & 11 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Expand Down Expand Up @@ -45,21 +46,18 @@ pub fn create_geyser_reconnecting_stream(
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {

let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let mut client = connect_result?;
let mut builder = GeyserGrpcClient::build_from_shared(addr).unwrap()
.x_token(token).unwrap()
.connect_timeout(connect_timeout.unwrap_or(Duration::from_secs(10)))
.timeout(request_timeout.unwrap_or(Duration::from_secs(10)))
.tls_config(config.unwrap_or(ClientTlsConfig::new())).unwrap();

let mut client = builder.connect().await.unwrap();

debug!("Subscribe with filter {:?}", subscribe_filter);

let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once2(subscribe_filter))
.await;
client.subscribe_once(subscribe_filter)).await;

// maybe not optimal
subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?
Expand Down
52 changes: 17 additions & 35 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::task::AbortHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_client::{GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
Expand Down Expand Up @@ -75,52 +76,40 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
attempt,
addr
);
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr,
token,
config,
connect_timeout,
request_timeout,
false,
)
.await;

let mut builder = GeyserGrpcClient::build_from_shared(addr).unwrap()
.x_token(token).unwrap()
.connect_timeout(connect_timeout.unwrap_or(Duration::from_secs(10)))
.timeout(request_timeout.unwrap_or(Duration::from_secs(10)))
.tls_config(config.unwrap_or(ClientTlsConfig::new())).unwrap();

let connect_result = builder.connect().await;

match connect_result {
Ok(client) => ConnectionState::Connecting(attempt, client),
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::ConfigurationError,
),
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
Err(GeyserGrpcBuilderError::MetadataValueError(_)) => {
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::ConfigurationError,
)
}
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
Err(GeyserGrpcBuilderError::InvalidXTokenLength(_)) => {
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::ConfigurationError,
)
}
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
Err(GeyserGrpcBuilderError::TonicError(tonic_error)) => {
warn!(
"connect failed on {} - aborting: {:?}",
grpc_source, tonic_error
);
ConnectionState::FatalError(attempt + 1, FatalErrorReason::NetworkError)
}
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
warn!(
"connect failed on {} - retrying: {:?}",
grpc_source, tonic_status
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
Err(GeyserGrpcBuilderError::EmptyChannel) => {
warn!(
"connect failed with send error on {} - retrying: {:?}",
grpc_source, send_error
"connect failed on {} - tonic::transport::Channel should be created, use `connect` or `connect_lazy` first",
grpc_source,
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Expand All @@ -134,7 +123,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(

let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),
client.subscribe_once(subscribe_filter),
)
.await;

Expand All @@ -150,13 +139,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
ConnectionState::Ready(geyser_stream)
}
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!(
"subscribe failed on {} after {} attempts - retrying",
grpc_source, attempt
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!(
"subscribe failed on {} after {} attempts - retrying",
Expand Down
21 changes: 5 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;

use solana_sdk::commitment_config::CommitmentConfig;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
Expand Down Expand Up @@ -156,20 +157,8 @@ impl GeyserFilter {
pub fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
solana_sdk::commitment_config::CommitmentLevel::Processed => CommitmentLevel::Processed,
solana_sdk::commitment_config::CommitmentLevel::Confirmed => CommitmentLevel::Confirmed,
solana_sdk::commitment_config::CommitmentLevel::Finalized => CommitmentLevel::Finalized,
}
}

0 comments on commit ce03864

Please sign in to comment.