From 1fb4c88e94680a87d9a762c4d6db4a8070b5cbae Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 28 May 2024 23:19:35 -0400 Subject: [PATCH] proto: add `timestamp` field to `SubscribeUpdate` message --- .github/workflows/test.yml | 2 + CHANGELOG.md | 33 + Cargo.lock | 11 +- Cargo.toml | 15 +- examples/rust/Cargo.toml | 2 +- yellowstone-grpc-client/Cargo.toml | 2 +- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/src/filters.rs | 23 +- yellowstone-grpc-geyser/src/grpc.rs | 3 + yellowstone-grpc-proto/Cargo.toml | 3 +- yellowstone-grpc-proto/proto/geyser.proto | 2 + yellowstone-grpc-proto/src/lib.rs | 2 +- yellowstone-grpc-tools/Cargo.toml | 2 +- .../src/bin/grpc-google-pubsub.rs | 8 +- yellowstone-grpc-tools/src/kafka/grpc.rs | 19 +- .../src/scylladb/consumer/grpc.rs | 867 ++++++++++++++++++ 16 files changed, 959 insertions(+), 37 deletions(-) create mode 100644 yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 31b0de9..b7a8a5b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,6 +9,8 @@ on: - 'master' - 'v1.16' - 'v1.17' + - 'v1.17-gamma' + - 'v1.18-gamma' workflow_dispatch: env: diff --git a/CHANGELOG.md b/CHANGELOG.md index 602aed6..c08240a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -241,6 +241,17 @@ The minor version will be incremented upon a breaking change and the patch versi ## 2024-02-06 +- yellowstone-grpc-client-1.13.0+solana.1.17.20.gamma +- yellowstone-grpc-geyser-1.12.0+solana.1.17.20.gamma +- yellowstone-grpc-proto-1.12.0+solana.1.17.20.gamma +- yellowstone-grpc-tools-1.0.0-rc.9+solana.1.17.20.gamma + +### Features + +- proto: add `timestamp` field to `SubscribeUpdate` message + +## 2024-02-06 + - yellowstone-grpc-client-1.13.0+solana.1.17.20 - yellowstone-grpc-geyser-1.12.0+solana.1.17.20 - yellowstone-grpc-proto-1.12.0+solana.1.17.20 @@ -283,6 +294,17 @@ The minor version will be incremented upon a breaking change and the patch versi - solana: update to 1.17.16 ([#274](https://github.com/rpcpool/yellowstone-grpc/pull/274)) +## 2024-01-09 + +- yellowstone-grpc-client-1.13.0+solana.1.17.15.gamma +- yellowstone-grpc-geyser-1.12.0+solana.1.17.15.gamma +- yellowstone-grpc-proto-1.12.0+solana.1.17.15.gamma +- yellowstone-grpc-tools-1.0.0-rc.9+solana.1.17.15.gamma + +### Features + +- proto: add `timestamp` field to `SubscribeUpdate` message + ## 2024-01-08 - yellowstone-grpc-client-1.13.0+solana.1.17.15 @@ -295,6 +317,17 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add more convert functions ([#264](https://github.com/rpcpool/yellowstone-grpc/pull/264)) - geyser: set plugin name to `{name}-{version}` ([#270](https://github.com/rpcpool/yellowstone-grpc/pull/270)) +## 2023-12-24 + +- yellowstone-grpc-client-1.13.0+solana.1.16.21 +- yellowstone-grpc-geyser-1.12.0+solana.1.16.21 +- yellowstone-grpc-proto-1.12.0+solana.1.16.21 +- yellowstone-grpc-tools-1.0.0-rc.10+solana.1.16.21 + +### Features + +- proto: add `timestamp` field to `SubscribeUpdate` message + ## 2023-12-22 - yellowstone-grpc-client-1.12.0+solana.1.17.12 diff --git a/Cargo.lock b/Cargo.lock index d08bd83..e09832e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5253,7 +5253,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.15.0+solana.1.18.21" +version = "1.15.0+solana.1.18.21.gamma" dependencies = [ "bytes", "futures", @@ -5266,7 +5266,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "1.13.0+solana.1.18.21" +version = "1.13.0+solana.1.18.21.gamma" dependencies = [ "anyhow", "backoff", @@ -5289,7 +5289,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.15.0+solana.1.18.21" +version = "1.15.0+solana.1.18.21.gamma" dependencies = [ "anyhow", "base64 0.21.7", @@ -5322,11 +5322,12 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.14.0+solana.1.18.21" +version = "1.14.0+solana.1.18.21.gamma" dependencies = [ "anyhow", "bincode", "prost 0.12.3", + "prost-types 0.12.3", "protobuf-src", "solana-account-decoder", "solana-sdk", @@ -5337,7 +5338,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.11+solana.1.18.21" +version = "1.0.0-rc.11+solana.1.18.21.gamma" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 084a0fb..e6e05df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 1.13.0+solana.1.18.21 - "yellowstone-grpc-client", # 1.15.0+solana.1.18.21 - "yellowstone-grpc-geyser", # 1.15.0+solana.1.18.21 - "yellowstone-grpc-proto", # 1.14.0+solana.1.18.21 - "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.21 + "examples/rust", # 1.13.0+solana.1.18.21.gamma + "yellowstone-grpc-client", # 1.15.0+solana.1.18.21.gamma + "yellowstone-grpc-geyser", # 1.15.0+solana.1.18.21.gamma + "yellowstone-grpc-proto", # 1.14.0+solana.1.18.21.gamma + "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.21.gamma ] [workspace.package] @@ -46,6 +46,7 @@ log = "0.4.17" maplit = "1.0.2" prometheus = "0.13.2" prost = "0.12.1" +prost-types = "0.12.1" protobuf-src = "1.1.0" rdkafka = "0.34.0" scylla = "0.13.0" @@ -69,8 +70,8 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" uuid = "1.8.0" vergen = "8.2.1" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.21" } -yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.21" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.21.gamma" } +yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.21.gamma" } [profile.release] lto = true diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index ad571be..b2746a3 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "1.13.0+solana.1.18.21" +version = "1.13.0+solana.1.18.21.gamma" authors = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 8bbcb05..e2ff4b4 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.15.0+solana.1.18.21" +version = "1.15.0+solana.1.18.21.gamma" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 7b51f52..435fca8 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.15.0+solana.1.18.21" +version = "1.15.0+solana.1.18.21.gamma" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index 47c20e2..642cc96 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -16,15 +16,20 @@ use { std::{ collections::{HashMap, HashSet}, str::FromStr, + time::SystemTime, }, - yellowstone_grpc_proto::prelude::{ - subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, - subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, - SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeUpdate, SubscribeUpdatePong, + yellowstone_grpc_proto::{ + prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate, + SubscribeUpdatePong, + }, + prost_types::Timestamp, }, }; @@ -158,6 +163,7 @@ impl Filter { Some(SubscribeUpdate { filters, update_oneof: Some(message.to_proto(&self.accounts_data_slice)), + timestamp: Some(Timestamp::from(SystemTime::now())), }) } }), @@ -168,6 +174,7 @@ impl Filter { self.ping.map(|id| SubscribeUpdate { filters: vec![], update_oneof: Some(UpdateOneof::Pong(SubscribeUpdatePong { id })), + timestamp: Some(Timestamp::from(SystemTime::now())), }) } } diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 29baa62..d7d34a5 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -24,6 +24,7 @@ use { atomic::{AtomicUsize, Ordering}, Arc, }, + time::SystemTime, }, tokio::{ fs, @@ -56,6 +57,7 @@ use { SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, SubscribeUpdateTransactionStatus, TransactionError as SubscribeUpdateTransactionError, }, + prost_types::Timestamp, }, }; @@ -1315,6 +1317,7 @@ impl Geyser for GrpcService { let ping_msg = SubscribeUpdate { filters: vec![], update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), + timestamp: Some(Timestamp::from(SystemTime::now())), }; loop { diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index ef26e0a..5360d3f 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-proto" -version = "1.14.0+solana.1.18.21" +version = "1.14.0+solana.1.18.21.gamma" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Protobuf Definitions" @@ -13,6 +13,7 @@ publish = true [dependencies] bincode = { workspace = true } prost = { workspace = true } +prost-types = { workspace = true } solana-account-decoder = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 153b8f5..caa220d 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -1,5 +1,6 @@ syntax = "proto3"; +import "google/protobuf/timestamp.proto"; import public "solana-storage.proto"; option go_package = "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"; @@ -104,6 +105,7 @@ message SubscribeUpdate { SubscribeUpdateBlockMeta block_meta = 7; SubscribeUpdateEntry entry = 8; } + google.protobuf.Timestamp timestamp = 11; } message SubscribeUpdateAccount { diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 8760c7c..60ebf3a 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -16,7 +16,7 @@ pub mod prelude { pub use super::{geyser::*, solana::storage::confirmed_block::*}; } -pub use {prost, tonic}; +pub use {prost, prost_types, tonic}; pub mod convert_to { use { diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index d670555..00cc6ca 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.11+solana.1.18.21" +version = "1.0.0-rc.11+solana.1.18.21.gamma" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Tools" diff --git a/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs index c29538e..31724ba 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-google-pubsub.rs @@ -177,9 +177,9 @@ impl ArgsAction { .context("failed to get message from gRPC")?; match &message { - SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Ping(_)) } => prom::recv_inc(GprcMessageKind::Ping), - SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Pong(_)) } => prom::recv_inc(GprcMessageKind::Pong), - SubscribeUpdate { filters: _, update_oneof: Some(value) } => { + SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Ping(_)), timestamp: _ } => prom::recv_inc(GprcMessageKind::Ping), + SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Pong(_)), timestamp: _ } => prom::recv_inc(GprcMessageKind::Pong), + SubscribeUpdate { filters: _, update_oneof: Some(value), timestamp: _ } => { if let UpdateOneof::Slot(slot) = value { prom::set_slot_tip( CommitmentLevel::try_from(slot.status).expect("valid commitment"), @@ -196,7 +196,7 @@ impl ArgsAction { prom::recv_inc(prom_kind); }, - SubscribeUpdate { filters: _, update_oneof: None } => anyhow::bail!("received empty updat emessage"), + SubscribeUpdate { filters: _, update_oneof: None, timestamp: _ } => anyhow::bail!("received empty updat emessage"), }; } }; diff --git a/yellowstone-grpc-tools/src/kafka/grpc.rs b/yellowstone-grpc-tools/src/kafka/grpc.rs index bb9997c..f2c703a 100644 --- a/yellowstone-grpc-tools/src/kafka/grpc.rs +++ b/yellowstone-grpc-tools/src/kafka/grpc.rs @@ -7,6 +7,7 @@ use { atomic::{AtomicUsize, Ordering}, Arc, }, + time::SystemTime, }, tokio::{ sync::{broadcast, mpsc, Notify}, @@ -24,13 +25,16 @@ use { }, tonic_health::server::health_reporter, tracing::{error, info}, - yellowstone_grpc_proto::prelude::{ - geyser_server::{Geyser, GeyserServer}, - subscribe_update::UpdateOneof, - GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, - GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, - GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, - PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdatePing, + yellowstone_grpc_proto::{ + prelude::{ + geyser_server::{Geyser, GeyserServer}, + subscribe_update::UpdateOneof, + GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, + GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, + GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, + PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdatePing, + }, + prost_types::Timestamp, }, }; @@ -119,6 +123,7 @@ impl Geyser for GrpcService { let ping_msg = SubscribeUpdate { filters: vec![], update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), + timestamp: Some(Timestamp::from(SystemTime::now())), }; loop { diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs new file mode 100644 index 0000000..73275f7 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -0,0 +1,867 @@ +use { + super::{ + common::{ConsumerId, ConsumerInfo, InitialOffsetPolicy}, + shard_iterator::{ShardFilter, ShardIterator}, + }, + crate::scylladb::{ + sink, + types::{ + BlockchainEventType, ProducerId, ProducerInfo, ShardId, ShardOffset, MAX_PRODUCER, + MIN_PROCUDER, + }, + }, + chrono::{DateTime, TimeDelta, Utc}, + futures::{future::try_join_all, Stream}, + scylla::{ + batch::{Batch, BatchType}, + cql_to_rust::FromCqlVal, + prepared_statement::PreparedStatement, + transport::query_result::SingleRowTypedError, + Session, + }, + std::{ + collections::{BTreeMap, BTreeSet}, + iter::repeat, + pin::Pin, + sync::Arc, + time::{Duration, SystemTime}, + }, + tokio::{sync::mpsc, time::Instant}, + tokio_stream::wrappers::ReceiverStream, + tonic::Response, + tracing::{error, info, warn}, + uuid::Uuid, + yellowstone_grpc_proto::{ + geyser::{subscribe_update::UpdateOneof, SubscribeUpdate}, + prost_types::Timestamp, + yellowstone::log::{ + yellowstone_log_server::YellowstoneLog, ConsumeRequest, EventSubscriptionPolicy, + }, + }, +}; + +const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); + +const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500); + +const DEFAULT_LAST_HEARTBEAT_TIME_DELTA: Duration = Duration::from_secs(10); + +const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10); + +const DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY: usize = 100; + +const UPDATE_CONSUMER_SHARD_OFFSET: &str = r###" + UPDATE consumer_info + SET offset = ?, updated_at = currentTimestamp() + WHERE + consumer_id = ? + AND producer_id = ? + AND shard_id = ? + AND event_type = ? + IF offset = ? +"###; + +/// +/// This query leverage the fact that partition data are always sorted by the clustering key and that scylla +/// always iterator or scan data in cluster order. In leyman terms that mean per partition limit will always return +/// the most recent entry for each producer_id. +pub const LIST_PRODUCER_LAST_HEARBEAT: &str = r###" + SELECT + producer_id, + created_at + FROM producer_slot_seen + PER PARTITION LIMIT 1 +"###; + +pub const GET_MIN_OFFSET_FOR_SLOT: &str = r###" + SELECT + shard_id, + min(offset) + FROM solana.slot_map_mv + WHERE slot = ? and producer_id = ? + ORDER BY shard_id + GROUP BY shard_id; +"###; + +pub const INSERT_CONSUMER_OFFSET: &str = r###" + INSERT INTO consumer_info ( + consumer_id, + producer_id, + shard_id, + event_type, + offset, + created_at, + updated_at + ) + VALUES + (?,?,?,?,?,currentTimestamp(), currentTimestamp()) +"###; + +pub const GET_CONSUMER_PRODUCER_MAPPING: &str = r###" + SELECT + producer_id + FROM consumer_producer_mapping + where consumer_id = ? +"###; + +pub const GET_SHARD_OFFSETS_FOR_CONSUMER_ID: &str = r###" + SELECT + shard_id, + event_type, + offset + FROM consumer_info + WHERE + consumer_id = ? + AND producer_id = ? + ORDER BY shard_id ASC +"###; + +pub const LIST_PRODUCERS_WITH_LOCK: &str = r###" + SELECT + producer_id + FROM producer_lock +"###; + +pub const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" + SELECT + producer_id, + count(1) + FROM producer_consumer_mapping_mv + GROUP BY producer_id +"###; + +pub const INSERT_CONSUMER_PRODUCER_MAPPING: &str = r###" + INSERT INTO consumer_producer_mapping ( + consumer_id, + producer_id, + created_at, + updated_at + ) + VALUES (?, ?, currentTimestamp(), currentTimestamp()) +"###; + +/// +/// CQL does not support OR conditions, +/// this is why use >=/<= to emulate the following condition: (producer_id = ? or ?) +/// produ +pub const GET_PRODUCER_INFO_BY_ID_OR_ANY: &str = r###" + SELECT + producer_id, + num_shards + FROM producer_info + WHERE producer_id >= ? and producer_id <= ? + LIMIT 1 + ALLOW FILTERING +"###; + +/// +/// Returns the latest offset per shard for a consumer id +/// +pub async fn get_shard_offsets_info_for_consumer_id( + session: Arc, + consumer_id: impl AsRef, + producer_id: ProducerId, + ev_types_to_include: &[BlockchainEventType], +) -> anyhow::Result> { + session + .query( + GET_SHARD_OFFSETS_FOR_CONSUMER_ID, + (consumer_id.as_ref(), producer_id), + ) + .await? + .rows_typed_or_empty::<(ShardId, BlockchainEventType, ShardOffset)>() + .filter(|result| { + if let Ok(triplet) = result { + ev_types_to_include.contains(&triplet.1) + } else { + false + } + }) + .collect::, _>>() + .map_err(anyhow::Error::new) +} + +/// +/// Returns the assigned producer id to specific consumer if any. +/// +pub async fn get_producer_id_for_consumer( + session: Arc, + consumer_id: impl AsRef, +) -> anyhow::Result> { + session + .query(GET_CONSUMER_PRODUCER_MAPPING, (consumer_id.as_ref(),)) + .await? + .maybe_first_row_typed::<(ProducerId,)>() + .map(|opt| opt.map(|row| row.0)) + .map_err(anyhow::Error::new) +} + +/// +/// Returns a list of producer that has a lock +/// +async fn list_producers_with_lock_held(session: Arc) -> anyhow::Result> { + session + .query(LIST_PRODUCERS_WITH_LOCK, &[]) + .await? + .rows_typed::<(ProducerId,)>()? + .map(|result| result.map(|row| row.0)) + .collect::, _>>() + .map_err(anyhow::Error::new) +} + +async fn list_producers_heartbeat( + session: Arc, + heartbeat_time_dt: Duration, +) -> anyhow::Result> { + let utc_now = Utc::now(); + let min_last_heartbeat = utc_now + .checked_sub_signed(TimeDelta::seconds(heartbeat_time_dt.as_secs().try_into()?)) + .ok_or(anyhow::anyhow!("Invalid heartbeat time delta"))?; + + let producer_id_with_last_hb_datetime_pairs = session + .query(LIST_PRODUCER_LAST_HEARBEAT, &[]) + .await? + .rows_typed::<(ProducerId, DateTime)>()? + //.map(|result| result.map(|row| row.0)) + .collect::, _>>()?; + //.map_err(anyhow::Error::new) + + Ok(producer_id_with_last_hb_datetime_pairs + .into_iter() + .filter(|(_, last_hb)| last_hb >= &min_last_heartbeat) + .map(|(pid, _)| pid) + .collect::>()) +} + +/// +/// Returns the producer id with least consumer assignment. +/// +async fn get_producer_id_with_least_assigned_consumer( + session: Arc, +) -> anyhow::Result { + let locked_producers = list_producers_with_lock_held(Arc::clone(&session)).await?; + + info!("{} producer lock(s) detected", locked_producers.len()); + let recently_active_producers = BTreeSet::from_iter( + list_producers_heartbeat(Arc::clone(&session), DEFAULT_LAST_HEARTBEAT_TIME_DELTA).await?, + ); + + info!( + "{} living producer(s) detected", + recently_active_producers.len() + ); + + let elligible_producers = locked_producers + .into_iter() + .filter(|producer_id| recently_active_producers.contains(producer_id)) + .collect::>(); + + info!("{} elligible producer(s)", recently_active_producers.len()); + let mut producer_count_pairs = session + .query(GET_PRODUCERS_CONSUMER_COUNT, &[]) + .await? + .rows_typed::<(ProducerId, i64)>()? + .collect::, _>>()?; + + elligible_producers.iter().for_each(|producer_id| { + producer_count_pairs + .entry(producer_id.to_owned()) + .or_insert(0); + }); + + producer_count_pairs + .into_iter() + .filter(|(producer_id, _)| elligible_producers.contains(producer_id)) + .min_by_key(|(_, count)| *count) + .map(|(producer_id, _)| producer_id) + .ok_or(anyhow::anyhow!("No producer is available right now")) +} + +/// +/// Returns a specific producer information by id or return a random producer_info if `producer_id` is None. +pub async fn get_producer_info_by_id_or_any( + session: Arc, + producer_id: Option, +) -> anyhow::Result> { + let qr = session + .query( + GET_PRODUCER_INFO_BY_ID_OR_ANY, + ( + producer_id.unwrap_or(MIN_PROCUDER), + producer_id.unwrap_or(MAX_PRODUCER), + ), + ) + .await?; + + match qr.single_row_typed::() { + Ok(row) => Ok(Some(row)), + Err(SingleRowTypedError::BadNumberOfRows(_)) => Ok(None), + Err(e) => Err(anyhow::Error::new(e)), + } +} + +fn get_blockchain_event_types( + event_sub_policy: EventSubscriptionPolicy, +) -> Vec { + match event_sub_policy { + EventSubscriptionPolicy::AccountUpdateOnly => vec![BlockchainEventType::AccountUpdate], + EventSubscriptionPolicy::TransactionOnly => vec![BlockchainEventType::NewTransaction], + EventSubscriptionPolicy::Both => vec![ + BlockchainEventType::AccountUpdate, + BlockchainEventType::NewTransaction, + ], + } +} + +async fn register_new_consumer( + session: Arc, + consumer_id: impl AsRef, + initial_offset_policy: InitialOffsetPolicy, + event_sub_policy: EventSubscriptionPolicy, +) -> anyhow::Result { + let producer_id = get_producer_id_with_least_assigned_consumer(Arc::clone(&session)).await?; + + let insert_consumer_mapping_ps = session.prepare(INSERT_CONSUMER_PRODUCER_MAPPING).await?; + session + .execute( + &insert_consumer_mapping_ps, + (consumer_id.as_ref(), producer_id), + ) + .await?; + + info!( + "consumer {:?} successfully assigned producer {:?}", + consumer_id.as_ref(), + producer_id + ); + let initital_shard_offsets = set_initial_consumer_shard_offsets( + Arc::clone(&session), + consumer_id.as_ref(), + producer_id, + initial_offset_policy, + event_sub_policy, + ) + .await?; + let cs = ConsumerInfo { + consumer_id: String::from(consumer_id.as_ref()), + producer_id, + initital_shard_offsets, + subscribed_blockchain_event_types: get_blockchain_event_types(event_sub_policy), + }; + + Ok(cs) +} + +/// +/// Gets an existing consumer with id = `consumer_id` if exists, otherwise creates a new consumer. +/// +async fn get_or_register_consumer( + session: Arc, + consumer_id: impl AsRef, + initial_offset_policy: InitialOffsetPolicy, + event_sub_policy: EventSubscriptionPolicy, +) -> anyhow::Result { + let maybe_producer_id = + get_producer_id_for_consumer(Arc::clone(&session), consumer_id.as_ref()).await?; + + if let Some(producer_id) = maybe_producer_id { + info!( + "consumer {:?} exists with producer {:?} assigned to it", + consumer_id.as_ref(), + producer_id + ); + + let ev_types = get_blockchain_event_types(event_sub_policy); + let shard_offsets = get_shard_offsets_info_for_consumer_id( + Arc::clone(&session), + consumer_id.as_ref(), + producer_id, + &ev_types, + ) + .await?; + if shard_offsets.is_empty() { + anyhow::bail!("Consumer state is corrupted, existing consumer should have offset already available."); + } + let cs = ConsumerInfo { + consumer_id: String::from(consumer_id.as_ref()), + producer_id, + initital_shard_offsets: shard_offsets, + subscribed_blockchain_event_types: ev_types, + }; + Ok(cs) + } else { + let cid = consumer_id.as_ref(); + info!("Bootstrapping consumer {cid}"); + register_new_consumer( + session, + consumer_id, + initial_offset_policy, + event_sub_policy, + ) + .await + } +} + +/// Sets the initial shard offsets for a newly created consumer based on [[`InitialOffsetPolicy`]]. +/// +/// Similar to seeking in a file, we can seek right at the beginning of the log, completly at the end or at first +/// log event containg a specific slot number. +async fn set_initial_consumer_shard_offsets( + session: Arc, + new_consumer_id: impl AsRef, + producer_id: ProducerId, + initial_offset_policy: InitialOffsetPolicy, + event_sub_policy: EventSubscriptionPolicy, +) -> anyhow::Result> { + // Create all the shards counter + let producer_info = get_producer_info_by_id_or_any(Arc::clone(&session), Some(producer_id)) + .await? + .unwrap_or_else(|| panic!("Producer Info `{:?}` must exists", producer_id)); + + let num_shards = producer_info.num_shards; + + let shard_offset_pairs = match initial_offset_policy { + InitialOffsetPolicy::Latest => { + sink::get_max_shard_offsets_for_producer( + Arc::clone(&session), + producer_id, + num_shards as usize, + ) + .await? + } + InitialOffsetPolicy::Earliest => repeat(0) + .take(num_shards as usize) + .enumerate() + .map(|(i, x)| (i as ShardId, x)) + .collect::>(), + InitialOffsetPolicy::SlotApprox(slot) => session + .query(GET_MIN_OFFSET_FOR_SLOT, (slot, producer_id)) + .await? + .rows_typed_or_empty::<(ShardId, ShardOffset)>() + .collect::, _>>()?, + }; + + let adjustment = match initial_offset_policy { + InitialOffsetPolicy::Earliest | InitialOffsetPolicy::SlotApprox(_) => -1, + InitialOffsetPolicy::Latest => 0, + }; + + let insert_consumer_offset_ps: PreparedStatement = + session.prepare(INSERT_CONSUMER_OFFSET).await?; + + let mut batch = Batch::new(BatchType::Unlogged); + let mut buffer = Vec::with_capacity(shard_offset_pairs.len()); + + let ev_types = get_blockchain_event_types(event_sub_policy); + + ev_types + .into_iter() + .flat_map(|ev_type| { + shard_offset_pairs + .iter() + .cloned() + .map(move |(shard_id, offset)| (ev_type, shard_id, offset)) + }) + .for_each(|(ev_type, shard_id, offset)| { + let offset = offset + adjustment; + batch.append_statement(insert_consumer_offset_ps.clone()); + buffer.push(( + new_consumer_id.as_ref(), + producer_id, + shard_id, + ev_type, + offset, + )); + }); + + session.batch(&batch, &buffer).await?; + + let shard_offsets = buffer + .drain(..) + .map(|(_, _, shard_id, ev_type, offset)| (shard_id, ev_type, offset)) + .collect::>(); + + Ok(shard_offsets) +} + +pub struct ScyllaYsLog { + session: Arc, +} + +impl ScyllaYsLog { + pub fn new(session: Arc) -> Self { + ScyllaYsLog { session } + } +} + +pub type LogStream = Pin> + Send>>; + +#[tonic::async_trait] +impl YellowstoneLog for ScyllaYsLog { + #[doc = r" Server streaming response type for the consume method."] + type ConsumeStream = LogStream; + + async fn consume( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let cr = request.into_inner(); + + let consumer_id = cr.consumer_id.clone().unwrap_or(Uuid::new_v4().to_string()); + let initial_offset_policy = match cr.initial_offset_policy() { + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Earliest => { + InitialOffsetPolicy::Earliest + } + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Latest => { + InitialOffsetPolicy::Latest + } + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Slot => { + let slot = cr.at_slot.ok_or(tonic::Status::invalid_argument( + "Expected at_lot when initital_offset_policy is to `Slot`", + ))?; + InitialOffsetPolicy::SlotApprox(slot) + } + }; + + let event_subscription_policy = cr.event_subscription_policy(); + let account_update_event_filter = cr.account_update_event_filter; + let tx_event_filter = cr.tx_event_filter; + + info!( + consumer_id = consumer_id, + initital_offset_policy = ?initial_offset_policy, + event_subscription_policy = ?event_subscription_policy, + ); + + let req = SpawnGrpcConsumerReq { + consumer_id, + account_update_event_filter, + tx_event_filter, + buffer_capacity: None, + offset_commit_interval: None, + }; + + let rx = spawn_grpc_consumer( + Arc::clone(&self.session), + req, + initial_offset_policy, + event_subscription_policy, + ) + .await + .map_err(|_e| tonic::Status::internal("fail to spawn consumer"))?; + + let ret = ReceiverStream::new(rx); + + let res = Response::new(Box::pin(ret) as Self::ConsumeStream); + Ok(res) + } +} + +struct GrpcConsumerSource { + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender>, + // The interval at which we want to commit our Offset progression to Scylla + offset_commit_interval: Duration, + shard_iterators: Vec, +} + +pub struct SpawnGrpcConsumerReq { + pub consumer_id: ConsumerId, + pub account_update_event_filter: + Option, + pub tx_event_filter: Option, + pub buffer_capacity: Option, + pub offset_commit_interval: Option, +} + +type GrpcConsumerReceiver = mpsc::Receiver>; + +pub async fn spawn_grpc_consumer( + session: Arc, + req: SpawnGrpcConsumerReq, + initial_offset_policy: InitialOffsetPolicy, + event_subscription_policy: EventSubscriptionPolicy, +) -> anyhow::Result { + let consumer_info = get_or_register_consumer( + Arc::clone(&session), + req.consumer_id.as_str(), + initial_offset_policy, + event_subscription_policy, + ) + .await + .map_err(|e| { + error!("{:?}", e); + tonic::Status::new( + tonic::Code::Internal, + format!("failed to get or create consumer {:?}", req.consumer_id), + ) + })?; + let buffer_capacity = req + .buffer_capacity + .unwrap_or(DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY); + let (sender, receiver) = mpsc::channel(buffer_capacity); + //let last_committed_offsets = state.shard_offsets.clone(); + let consumer_session = Arc::clone(&session); + + let shard_filter = ShardFilter { + tx_account_keys: req + .tx_event_filter + .map(|f| f.account_keys) + .unwrap_or_default(), + account_pubkyes: req + .account_update_event_filter + .as_ref() + .map(|f| f.pubkeys.to_owned()) + .unwrap_or_default(), + account_owners: req + .account_update_event_filter + .as_ref() + .map(|f| f.owners.to_owned()) + .unwrap_or_default(), + }; + + let shard_iterators = try_join_all(consumer_info.initital_shard_offsets.iter().cloned().map( + |(shard_id, ev_type, shard_offset)| { + let session = Arc::clone(&session); + let producer_id = consumer_info.producer_id; + let shard_filter = shard_filter.clone(); + ShardIterator::new( + session, + producer_id, + shard_id, + shard_offset, + // The ev_type will dictate if shard iterator streams account update or transaction. + ev_type, + Some(shard_filter), + ) + }, + )) + .await?; + + let consumer = GrpcConsumerSource::new( + consumer_session, + consumer_info, + sender, + req.offset_commit_interval + .unwrap_or(DEFAULT_OFFSET_COMMIT_INTERVAL), + shard_iterators, + ) + .await?; + + tokio::spawn(async move { + consumer + .into_daemon() + .await + .expect("consumer terminated abruptly"); + }); + Ok(receiver) +} + +struct UpdateShardOffsetClosure { + session: Arc, + consumer_id: ConsumerId, + producer_id: ProducerId, + update_prepared_stmt: PreparedStatement, +} + +impl UpdateShardOffsetClosure { + async fn new( + session: Arc, + consumer_id: ConsumerId, + producer_id: ProducerId, + ) -> anyhow::Result { + let ps = session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; + Ok(UpdateShardOffsetClosure { + session, + consumer_id, + producer_id, + update_prepared_stmt: ps, + }) + } + + async fn execute( + &self, + old_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], + new_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], + ) -> anyhow::Result> { + // Since the commit offset is partitionned by consumer_id/producer_id + // and that we using LWT, the entire batch will be atomic. + // + // LOGGING Batch mode is when you have a batch that span multiple partition and need some atomicity. + // In our case, we can disable batch logging since we are batching since-partition data. + // Apparently, this is done by default by Scylla, but we make it explicit here since the driver is not quite mature. + let mut atomic_batch = Batch::new(BatchType::Unlogged); + + let buffer = old_offsets + .iter() + .zip(new_offsets.iter()) + .filter(|((_, _, old_offset), (_, _, new_offset))| old_offset < new_offset) + .map( + |((shard_id, event_type, old_offset), (shard_id2, event_type2, new_offset))| { + if shard_id != shard_id2 { + panic!("Misaligned consumer offset update"); + } + if event_type != event_type2 { + panic!("Misaligned event type during offset update"); + } + ( + new_offset, + self.consumer_id.clone(), + self.producer_id, + shard_id, + event_type, + old_offset, + ) + }, + ) + .collect::>(); + + if buffer.is_empty() { + return Ok(Ok(())); + } + + repeat(()) + .take(buffer.len()) + .for_each(|_| atomic_batch.append_statement(self.update_prepared_stmt.clone())); + + let query_result = self.session.batch(&atomic_batch, &buffer).await?; + + let row = query_result.first_row().map_err(anyhow::Error::new)?; + + let success = row + .columns + .first() // first column of LWT is always "success" field + .and_then(|opt| opt.to_owned()) + .map(bool::from_cql) + .transpose()? + .unwrap_or(false); + + let actual_offset = row + .columns + .get(5) // offset column + .and_then(|opt| opt.to_owned()) + .map(ShardOffset::from_cql) + .transpose()?; + + if success { + Ok(Ok(())) + } else { + Ok(Err(actual_offset.expect("missing actual offset from LWT"))) + } + } +} + +impl GrpcConsumerSource { + async fn new( + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender>, + offset_commit_interval: Duration, + mut shard_iterators: Vec, + ) -> anyhow::Result { + // Prewarm every shard iterator + try_join_all(shard_iterators.iter_mut().map(|shard_it| shard_it.warm())).await?; + + Ok(GrpcConsumerSource { + session, + consumer_info, + sender, + offset_commit_interval, + shard_iterators, + }) + } + + async fn into_daemon(mut self) -> anyhow::Result<()> { + let consumer_id = self.consumer_info.consumer_id; + let producer_id = self.consumer_info.producer_id; + let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; + let update_shard_offset_fn = UpdateShardOffsetClosure::new( + Arc::clone(&self.session), + consumer_id.clone(), + producer_id, + ) + .await?; + + info!("Serving consumer: {:?}", consumer_id); + + let mut last_committed_offsets = self.consumer_info.initital_shard_offsets.clone(); + last_committed_offsets.sort_by_key(|tuple| (tuple.0, tuple.1)); + self.shard_iterators + .sort_by_key(|it| (it.shard_id, it.event_type)); + + let mut max_seen_slot = -1; + let mut num_event_between_two_slots = 0; + + let mut t = Instant::now(); + loop { + for shard_it in self.shard_iterators.iter_mut() { + let maybe = shard_it.try_next().await?; + if let Some(block_chain_event) = maybe { + if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD { + warn!( + "consumer {consumer_id} micro batch took {:?} to fetch.", + t.elapsed() + ); + } + if max_seen_slot < block_chain_event.slot { + info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); + max_seen_slot = block_chain_event.slot; + num_event_between_two_slots = 0; + } + let geyser_event = match block_chain_event.event_type { + BlockchainEventType::AccountUpdate => { + UpdateOneof::Account(block_chain_event.try_into()?) + } + BlockchainEventType::NewTransaction => { + UpdateOneof::Transaction(block_chain_event.try_into()?) + } + }; + let subscribe_update = SubscribeUpdate { + filters: Default::default(), + update_oneof: Some(geyser_event), + timestamp: Some(Timestamp::from(SystemTime::now())), + }; + let t_send = Instant::now(); + + if self.sender.send(Ok(subscribe_update)).await.is_err() { + warn!("Consumer {consumer_id} closed its streaming half"); + return Ok(()); + } + let send_latency = t_send.elapsed(); + if send_latency >= CLIENT_LAG_WARN_THRESHOLD { + warn!("Slow read from consumer {consumer_id}, recorded latency: {send_latency:?}") + } + num_event_between_two_slots += 1; + t = Instant::now(); + } + } + + // Every now and then, we commit where the consumer is loc + if commit_offset_deadline.elapsed() > Duration::ZERO { + let mut new_offsets_to_commit = self + .shard_iterators + .iter() + .map(|shard_it| { + ( + shard_it.shard_id, + shard_it.event_type, + shard_it.last_offset(), + ) + }) + .collect::>(); + + let result = update_shard_offset_fn + .execute(&last_committed_offsets, &new_offsets_to_commit) + .await?; + + if let Err(_actual_offset_in_scylla) = result { + anyhow::bail!("two concurrent connections are using the same consumer instance") + } + info!("Successfully committed offsets for consumer {consumer_id}"); + std::mem::swap(&mut new_offsets_to_commit, &mut last_committed_offsets); + commit_offset_deadline = Instant::now() + self.offset_commit_interval; + } + } + } +}