From 9c5230c0054a8ab723d85591e6ccd69582419fd5 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 2 Nov 2024 10:02:43 +0200 Subject: [PATCH] geyser: use Arc wrapped messages in block message (#446) --- CHANGELOG.md | 1 + yellowstone-grpc-geyser/src/filters.rs | 250 ++++++++++-- yellowstone-grpc-geyser/src/grpc.rs | 539 +++---------------------- yellowstone-grpc-geyser/src/lib.rs | 5 +- yellowstone-grpc-geyser/src/message.rs | 228 +++++++++++ yellowstone-grpc-geyser/src/plugin.rs | 11 +- 6 files changed, 495 insertions(+), 539 deletions(-) create mode 100644 yellowstone-grpc-geyser/src/message.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index a576a175..f6abbc80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: use `gzip`/`zstd` features by default ([#436](https://github.com/rpcpool/yellowstone-grpc/pull/436)) - geyser: optimize consuming of new filters ([#439](https://github.com/rpcpool/yellowstone-grpc/pull/439)) +- geyser: use Arc wrapped messages in block message ([#446](https://github.com/rpcpool/yellowstone-grpc/pull/446)) ### Breaking diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index 61210e67..0006f892 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -5,9 +5,9 @@ use { ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntry, ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions, }, - grpc::{ - Message, MessageAccount, MessageBlock, MessageBlockMeta, MessageEntry, MessageRef, - MessageSlot, MessageTransaction, + message::{ + Message, MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta, + MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo, }, }, base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, @@ -16,18 +16,172 @@ use { std::{ collections::{HashMap, HashSet}, str::FromStr, + sync::Arc, }, - yellowstone_grpc_proto::prelude::{ - subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, - subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, - SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeUpdate, SubscribeUpdatePong, + yellowstone_grpc_proto::{ + convert_to, + prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate, + SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, SubscribeUpdateEntry, SubscribeUpdatePong, + SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, TransactionError as SubscribeUpdateTransactionError, + }, }, }; +#[derive(Debug, Clone)] +pub enum FilteredMessage<'a> { + Slot(&'a MessageSlot), + Account(&'a MessageAccount), + Transaction(&'a MessageTransaction), + TransactionStatus(&'a MessageTransaction), + Entry(&'a MessageEntry), + Block(MessageBlock), + BlockMeta(&'a MessageBlockMeta), +} + +impl<'a> FilteredMessage<'a> { + fn as_proto_account( + message: &MessageAccountInfo, + accounts_data_slice: &[FilterAccountsDataSlice], + ) -> SubscribeUpdateAccountInfo { + let data = if accounts_data_slice.is_empty() { + message.data.clone() + } else { + let mut data = Vec::with_capacity(accounts_data_slice.iter().map(|ds| ds.length).sum()); + for data_slice in accounts_data_slice { + if message.data.len() >= data_slice.end { + data.extend_from_slice(&message.data[data_slice.start..data_slice.end]); + } + } + data + }; + SubscribeUpdateAccountInfo { + pubkey: message.pubkey.as_ref().into(), + lamports: message.lamports, + owner: message.owner.as_ref().into(), + executable: message.executable, + rent_epoch: message.rent_epoch, + data, + write_version: message.write_version, + txn_signature: message.txn_signature.map(|s| s.as_ref().into()), + } + } + + fn as_proto_transaction(message: &MessageTransactionInfo) -> SubscribeUpdateTransactionInfo { + SubscribeUpdateTransactionInfo { + signature: message.signature.as_ref().into(), + is_vote: message.is_vote, + transaction: Some(convert_to::create_transaction(&message.transaction)), + meta: Some(convert_to::create_transaction_meta(&message.meta)), + index: message.index as u64, + } + } + + fn as_proto_entry(message: &MessageEntry) -> SubscribeUpdateEntry { + SubscribeUpdateEntry { + slot: message.slot, + index: message.index as u64, + num_hashes: message.num_hashes, + hash: message.hash.into(), + executed_transaction_count: message.executed_transaction_count, + starting_transaction_index: message.starting_transaction_index, + } + } + + pub fn as_proto(&self, accounts_data_slice: &[FilterAccountsDataSlice]) -> UpdateOneof { + match self { + Self::Slot(message) => UpdateOneof::Slot(SubscribeUpdateSlot { + slot: message.slot, + parent: message.parent, + status: message.status as i32, + }), + Self::Account(message) => UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(Self::as_proto_account( + message.account.as_ref(), + accounts_data_slice, + )), + slot: message.slot, + is_startup: message.is_startup, + }), + Self::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction { + transaction: Some(Self::as_proto_transaction(message.transaction.as_ref())), + slot: message.slot, + }), + Self::TransactionStatus(message) => { + UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus { + slot: message.slot, + signature: message.transaction.signature.as_ref().into(), + is_vote: message.transaction.is_vote, + index: message.transaction.index as u64, + err: match &message.transaction.meta.status { + Ok(()) => None, + Err(err) => Some(SubscribeUpdateTransactionError { + err: bincode::serialize(&err) + .expect("transaction error to serialize to bytes"), + }), + }, + }) + } + Self::Entry(message) => UpdateOneof::Entry(Self::as_proto_entry(message)), + Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { + slot: message.meta.slot, + blockhash: message.meta.blockhash.clone(), + rewards: Some(convert_to::create_rewards_obj( + message.meta.rewards.as_slice(), + message.meta.num_partitions, + )), + block_time: message.meta.block_time.map(convert_to::create_timestamp), + block_height: message + .meta + .block_height + .map(convert_to::create_block_height), + parent_slot: message.meta.parent_slot, + parent_blockhash: message.meta.parent_blockhash.clone(), + executed_transaction_count: message.meta.executed_transaction_count, + transactions: message + .transactions + .iter() + .map(|tx| Self::as_proto_transaction(tx.as_ref())) + .collect(), + updated_account_count: message.updated_account_count, + accounts: message + .accounts + .iter() + .map(|acc| Self::as_proto_account(acc.as_ref(), accounts_data_slice)) + .collect(), + entries_count: message.meta.entries_count, + entries: message + .entries + .iter() + .map(|entry| Self::as_proto_entry(entry.as_ref())) + .collect(), + }), + Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { + slot: message.slot, + blockhash: message.blockhash.clone(), + rewards: Some(convert_to::create_rewards_obj( + message.rewards.as_slice(), + message.num_partitions, + )), + block_time: message.block_time.map(convert_to::create_timestamp), + block_height: message.block_height.map(convert_to::create_block_height), + parent_slot: message.parent_slot, + parent_blockhash: message.parent_blockhash.clone(), + executed_transaction_count: message.executed_transaction_count, + entries_count: message.entries_count, + }), + } + } +} + #[derive(Debug, Clone)] pub struct Filter { accounts: FilterAccounts, @@ -129,7 +283,7 @@ impl Filter { &'a self, message: &'a Message, commitment: Option, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { match message { Message::Account(message) => self.accounts.get_filters(message), Message::Slot(message) => self.slots.get_filters(message, commitment), @@ -157,7 +311,7 @@ impl Filter { } else { Some(SubscribeUpdate { filters, - update_oneof: Some(message.to_proto(&self.accounts_data_slice)), + update_oneof: Some(message.as_proto(&self.accounts_data_slice)), }) } }), @@ -239,14 +393,14 @@ impl FilterAccounts { fn get_filters<'a>( &'a self, message: &'a MessageAccount, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { let mut filter = FilterAccountsMatch::new(self); filter.match_account(&message.account.pubkey); filter.match_owner(&message.account.owner); filter.match_data(&message.account.data); Box::new(std::iter::once(( filter.get_filters(), - MessageRef::Account(message), + FilteredMessage::Account(message), ))) } } @@ -439,7 +593,7 @@ impl FilterSlots { &'a self, message: &'a MessageSlot, commitment: Option, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters .iter() @@ -451,7 +605,7 @@ impl FilterSlots { } }) .collect(), - MessageRef::Slot(message), + FilteredMessage::Slot(message), ))) } } @@ -547,7 +701,7 @@ impl FilterTransactions { pub fn get_filters<'a>( &'a self, message: &'a MessageTransaction, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { let filters = self .filters .iter() @@ -622,8 +776,10 @@ impl FilterTransactions { }) .collect(); let message = match self.filter_type { - FilterTransactionsType::Transaction => MessageRef::Transaction(message), - FilterTransactionsType::TransactionStatus => MessageRef::TransactionStatus(message), + FilterTransactionsType::Transaction => FilteredMessage::Transaction(message), + FilterTransactionsType::TransactionStatus => { + FilteredMessage::TransactionStatus(message) + } }; Box::new(std::iter::once((filters, message))) } @@ -653,10 +809,10 @@ impl FilterEntry { fn get_filters<'a>( &'a self, message: &'a MessageEntry, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters.clone(), - MessageRef::Entry(message), + FilteredMessage::Entry(message), ))) } } @@ -723,7 +879,7 @@ impl FilterBlocks { fn get_filters<'a>( &'a self, message: &'a MessageBlock, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(self.filters.iter().map(move |(filter, inner)| { #[allow(clippy::unnecessary_filter_map)] let transactions = if matches!(inner.include_transactions, None | Some(true)) { @@ -742,7 +898,7 @@ impl FilterBlocks { return None; } - Some(tx) + Some(Arc::clone(tx)) }) .collect::>() } else { @@ -764,7 +920,7 @@ impl FilterBlocks { return None; } - Some(account) + Some(Arc::clone(account)) }) .collect::>() } else { @@ -772,14 +928,20 @@ impl FilterBlocks { }; let entries = if inner.include_entries == Some(true) { - message.entries.iter().collect::>() + message.entries.to_vec() } else { vec![] }; ( vec![filter.clone()], - MessageRef::Block((message, transactions, accounts, entries).into()), + FilteredMessage::Block(MessageBlock { + meta: Arc::clone(&message.meta), + transactions, + updated_account_count: message.updated_account_count, + accounts, + entries, + }), ) })) } @@ -809,10 +971,10 @@ impl FilterBlocksMeta { fn get_filters<'a>( &'a self, message: &'a MessageBlockMeta, - ) -> Box, MessageRef<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters.clone(), - MessageRef::BlockMeta(message), + FilteredMessage::BlockMeta(message), ))) } } @@ -857,10 +1019,11 @@ impl FilterAccountsDataSlice { #[cfg(test)] mod tests { use { + super::FilteredMessage, crate::{ config::ConfigGrpcFilters, filters::Filter, - grpc::{Message, MessageRef, MessageTransaction, MessageTransactionInfo}, + message::{Message, MessageTransaction, MessageTransactionInfo}, }, solana_sdk::{ hash::Hash, @@ -870,7 +1033,7 @@ mod tests { transaction::{SanitizedTransaction, Transaction}, }, solana_transaction_status::TransactionStatusMeta, - std::collections::HashMap, + std::{collections::HashMap, sync::Arc}, yellowstone_grpc_proto::geyser::{ SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions, }, @@ -908,13 +1071,13 @@ mod tests { }; let sig = sanitized_transaction.signature(); MessageTransaction { - transaction: MessageTransactionInfo { + transaction: Arc::new(MessageTransactionInfo { signature: *sig, is_vote: true, transaction: sanitized_transaction, meta, index: 1, - }, + }), slot: 100, } } @@ -1082,9 +1245,12 @@ mod tests { let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); assert_eq!(updates[0].0, vec!["serum"]); - assert!(matches!(updates[0].1, MessageRef::Transaction(_))); + assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); assert_eq!(updates[1].0, Vec::::new()); - assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_))); + assert!(matches!( + updates[1].1, + FilteredMessage::TransactionStatus(_) + )); } #[test] @@ -1129,9 +1295,12 @@ mod tests { let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); assert_eq!(updates[0].0, vec!["serum"]); - assert!(matches!(updates[0].1, MessageRef::Transaction(_))); + assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); assert_eq!(updates[1].0, Vec::::new()); - assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_))); + assert!(matches!( + updates[1].1, + FilteredMessage::TransactionStatus(_) + )); } #[test] @@ -1228,9 +1397,12 @@ mod tests { let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); assert_eq!(updates[0].0, vec!["serum"]); - assert!(matches!(updates[0].1, MessageRef::Transaction(_))); + assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); assert_eq!(updates[1].0, Vec::::new()); - assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_))); + assert!(matches!( + updates[1].1, + FilteredMessage::TransactionStatus(_) + )); } #[test] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index c0da27ae..54252c67 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1,23 +1,17 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters}, - filters::{Filter, FilterAccountsDataSlice}, + filters::Filter, + message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, }, - agave_geyser_plugin_interface::geyser_plugin_interface::{ - ReplicaAccountInfoV3, ReplicaBlockInfoV4, ReplicaEntryInfoV2, ReplicaTransactionInfoV2, - SlotStatus, - }, anyhow::Context, log::{error, info}, solana_sdk::{ - clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES}, + clock::{Slot, MAX_RECENT_BLOCKHASHES}, pubkey::Pubkey, - signature::Signature, - transaction::SanitizedTransaction, }, - solana_transaction_status::{Reward, TransactionStatusMeta}, std::{ collections::{BTreeMap, HashMap}, sync::{ @@ -42,457 +36,16 @@ use { Request, Response, Result as TonicResult, Status, Streaming, }, tonic_health::server::health_reporter, - yellowstone_grpc_proto::{ - convert_to, - prelude::{ - geyser_server::{Geyser, GeyserServer}, - subscribe_update::UpdateOneof, - CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, - GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, - GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, - IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, - SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock, - SubscribeUpdateBlockMeta, SubscribeUpdateEntry, SubscribeUpdatePing, - SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, - SubscribeUpdateTransactionStatus, TransactionError as SubscribeUpdateTransactionError, - }, + yellowstone_grpc_proto::prelude::{ + geyser_server::{Geyser, GeyserServer}, + subscribe_update::UpdateOneof, + CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, + GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, + GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, + PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdatePing, }, }; -#[derive(Debug, Clone)] -pub struct MessageAccountInfo { - pub pubkey: Pubkey, - pub lamports: u64, - pub owner: Pubkey, - pub executable: bool, - pub rent_epoch: u64, - pub data: Vec, - pub write_version: u64, - pub txn_signature: Option, -} - -impl MessageAccountInfo { - fn to_proto( - &self, - accounts_data_slice: &[FilterAccountsDataSlice], - ) -> SubscribeUpdateAccountInfo { - let data = if accounts_data_slice.is_empty() { - self.data.clone() - } else { - let mut data = Vec::with_capacity(accounts_data_slice.iter().map(|ds| ds.length).sum()); - for data_slice in accounts_data_slice { - if self.data.len() >= data_slice.end { - data.extend_from_slice(&self.data[data_slice.start..data_slice.end]); - } - } - data - }; - SubscribeUpdateAccountInfo { - pubkey: self.pubkey.as_ref().into(), - lamports: self.lamports, - owner: self.owner.as_ref().into(), - executable: self.executable, - rent_epoch: self.rent_epoch, - data, - write_version: self.write_version, - txn_signature: self.txn_signature.map(|s| s.as_ref().into()), - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageAccount { - pub account: MessageAccountInfo, - pub slot: u64, - pub is_startup: bool, -} - -impl<'a> From<(&'a ReplicaAccountInfoV3<'a>, u64, bool)> for MessageAccount { - fn from((account, slot, is_startup): (&'a ReplicaAccountInfoV3<'a>, u64, bool)) -> Self { - Self { - account: MessageAccountInfo { - pubkey: Pubkey::try_from(account.pubkey).expect("valid Pubkey"), - lamports: account.lamports, - owner: Pubkey::try_from(account.owner).expect("valid Pubkey"), - executable: account.executable, - rent_epoch: account.rent_epoch, - data: account.data.into(), - write_version: account.write_version, - txn_signature: account.txn.map(|txn| *txn.signature()), - }, - slot, - is_startup, - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct MessageSlot { - pub slot: u64, - pub parent: Option, - pub status: CommitmentLevel, -} - -impl From<(u64, Option, SlotStatus)> for MessageSlot { - fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { - Self { - slot, - parent, - status: match status { - SlotStatus::Processed => CommitmentLevel::Processed, - SlotStatus::Confirmed => CommitmentLevel::Confirmed, - SlotStatus::Rooted => CommitmentLevel::Finalized, - }, - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageTransactionInfo { - pub signature: Signature, - pub is_vote: bool, - pub transaction: SanitizedTransaction, - pub meta: TransactionStatusMeta, - pub index: usize, -} - -impl MessageTransactionInfo { - fn to_proto(&self) -> SubscribeUpdateTransactionInfo { - SubscribeUpdateTransactionInfo { - signature: self.signature.as_ref().into(), - is_vote: self.is_vote, - transaction: Some(convert_to::create_transaction(&self.transaction)), - meta: Some(convert_to::create_transaction_meta(&self.meta)), - index: self.index as u64, - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageTransaction { - pub transaction: MessageTransactionInfo, - pub slot: u64, -} - -impl<'a> From<(&'a ReplicaTransactionInfoV2<'a>, u64)> for MessageTransaction { - fn from((transaction, slot): (&'a ReplicaTransactionInfoV2<'a>, u64)) -> Self { - Self { - transaction: MessageTransactionInfo { - signature: *transaction.signature, - is_vote: transaction.is_vote, - transaction: transaction.transaction.clone(), - meta: transaction.transaction_status_meta.clone(), - index: transaction.index, - }, - slot, - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageEntry { - pub slot: u64, - pub index: usize, - pub num_hashes: u64, - pub hash: Vec, - pub executed_transaction_count: u64, - pub starting_transaction_index: u64, -} - -impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { - fn from(entry: &ReplicaEntryInfoV2) -> Self { - Self { - slot: entry.slot, - index: entry.index, - num_hashes: entry.num_hashes, - hash: entry.hash.into(), - executed_transaction_count: entry.executed_transaction_count, - starting_transaction_index: entry - .starting_transaction_index - .try_into() - .expect("failed convert usize to u64"), - } - } -} - -impl MessageEntry { - fn to_proto(&self) -> SubscribeUpdateEntry { - SubscribeUpdateEntry { - slot: self.slot, - index: self.index as u64, - num_hashes: self.num_hashes, - hash: self.hash.clone(), - executed_transaction_count: self.executed_transaction_count, - starting_transaction_index: self.starting_transaction_index, - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageBlock { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: String, - pub blockhash: String, - pub rewards: Vec, - pub num_partitions: Option, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub transactions: Vec, - pub updated_account_count: u64, - pub accounts: Vec, - pub entries_count: u64, - pub entries: Vec, -} - -impl - From<( - MessageBlockMeta, - Vec, - Vec, - Vec, - )> for MessageBlock -{ - fn from( - (blockinfo, transactions, accounts, entries): ( - MessageBlockMeta, - Vec, - Vec, - Vec, - ), - ) -> Self { - Self { - parent_slot: blockinfo.parent_slot, - slot: blockinfo.slot, - blockhash: blockinfo.blockhash, - parent_blockhash: blockinfo.parent_blockhash, - rewards: blockinfo.rewards, - num_partitions: blockinfo.num_partitions, - block_time: blockinfo.block_time, - block_height: blockinfo.block_height, - executed_transaction_count: blockinfo.executed_transaction_count, - transactions, - updated_account_count: accounts.len() as u64, - accounts, - entries_count: entries.len() as u64, - entries, - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageBlockMeta { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: String, - pub blockhash: String, - pub rewards: Vec, - pub num_partitions: Option, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub entries_count: u64, -} - -impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { - fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { - Self { - parent_slot: blockinfo.parent_slot, - slot: blockinfo.slot, - parent_blockhash: blockinfo.parent_blockhash.to_string(), - blockhash: blockinfo.blockhash.to_string(), - rewards: blockinfo.rewards.rewards.clone(), - num_partitions: blockinfo.rewards.num_partitions, - block_time: blockinfo.block_time, - block_height: blockinfo.block_height, - executed_transaction_count: blockinfo.executed_transaction_count, - entries_count: blockinfo.entry_count, - } - } -} - -#[derive(Debug, Clone)] -#[allow(clippy::large_enum_variant)] -pub enum Message { - Slot(MessageSlot), - Account(MessageAccount), - Transaction(MessageTransaction), - Entry(MessageEntry), - Block(MessageBlock), - BlockMeta(MessageBlockMeta), -} - -impl Message { - pub const fn get_slot(&self) -> u64 { - match self { - Self::Slot(msg) => msg.slot, - Self::Account(msg) => msg.slot, - Self::Transaction(msg) => msg.slot, - Self::Entry(msg) => msg.slot, - Self::Block(msg) => msg.slot, - Self::BlockMeta(msg) => msg.slot, - } - } - - pub const fn kind(&self) -> &'static str { - match self { - Self::Slot(_) => "Slot", - Self::Account(_) => "Account", - Self::Transaction(_) => "Transaction", - Self::Entry(_) => "Entry", - Self::Block(_) => "Block", - Self::BlockMeta(_) => "BlockMeta", - } - } -} - -#[derive(Debug, Clone)] -pub struct MessageBlockRef<'a> { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: &'a String, - pub blockhash: &'a String, - pub rewards: &'a Vec, - pub num_partitions: Option, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub transactions: Vec<&'a MessageTransactionInfo>, - pub updated_account_count: u64, - pub accounts: Vec<&'a MessageAccountInfo>, - pub entries_count: u64, - pub entries: Vec<&'a MessageEntry>, -} - -impl<'a> - From<( - &'a MessageBlock, - Vec<&'a MessageTransactionInfo>, - Vec<&'a MessageAccountInfo>, - Vec<&'a MessageEntry>, - )> for MessageBlockRef<'a> -{ - fn from( - (block, transactions, accounts, entries): ( - &'a MessageBlock, - Vec<&'a MessageTransactionInfo>, - Vec<&'a MessageAccountInfo>, - Vec<&'a MessageEntry>, - ), - ) -> Self { - Self { - parent_slot: block.parent_slot, - slot: block.slot, - parent_blockhash: &block.parent_blockhash, - blockhash: &block.blockhash, - rewards: &block.rewards, - num_partitions: block.num_partitions, - block_time: block.block_time, - block_height: block.block_height, - executed_transaction_count: block.executed_transaction_count, - transactions, - updated_account_count: block.updated_account_count, - accounts, - entries_count: block.entries_count, - entries, - } - } -} - -#[derive(Debug, Clone)] -#[allow(clippy::large_enum_variant)] -pub enum MessageRef<'a> { - Slot(&'a MessageSlot), - Account(&'a MessageAccount), - Transaction(&'a MessageTransaction), - TransactionStatus(&'a MessageTransaction), - Entry(&'a MessageEntry), - Block(MessageBlockRef<'a>), - BlockMeta(&'a MessageBlockMeta), -} - -impl<'a> MessageRef<'a> { - pub fn to_proto(&self, accounts_data_slice: &[FilterAccountsDataSlice]) -> UpdateOneof { - match self { - Self::Slot(message) => UpdateOneof::Slot(SubscribeUpdateSlot { - slot: message.slot, - parent: message.parent, - status: message.status as i32, - }), - Self::Account(message) => UpdateOneof::Account(SubscribeUpdateAccount { - account: Some(message.account.to_proto(accounts_data_slice)), - slot: message.slot, - is_startup: message.is_startup, - }), - Self::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction { - transaction: Some(message.transaction.to_proto()), - slot: message.slot, - }), - Self::TransactionStatus(message) => { - UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus { - slot: message.slot, - signature: message.transaction.signature.as_ref().into(), - is_vote: message.transaction.is_vote, - index: message.transaction.index as u64, - err: match &message.transaction.meta.status { - Ok(()) => None, - Err(err) => Some(SubscribeUpdateTransactionError { - err: bincode::serialize(&err) - .expect("transaction error to serialize to bytes"), - }), - }, - }) - } - Self::Entry(message) => UpdateOneof::Entry(message.to_proto()), - Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { - slot: message.slot, - blockhash: message.blockhash.clone(), - rewards: Some(convert_to::create_rewards_obj( - message.rewards.as_slice(), - message.num_partitions, - )), - block_time: message.block_time.map(convert_to::create_timestamp), - block_height: message.block_height.map(convert_to::create_block_height), - parent_slot: message.parent_slot, - parent_blockhash: message.parent_blockhash.clone(), - executed_transaction_count: message.executed_transaction_count, - transactions: message - .transactions - .iter() - .map(|tx| tx.to_proto()) - .collect(), - updated_account_count: message.updated_account_count, - accounts: message - .accounts - .iter() - .map(|acc| acc.to_proto(accounts_data_slice)) - .collect(), - entries_count: message.entries_count, - entries: message - .entries - .iter() - .map(|entry| entry.to_proto()) - .collect(), - }), - Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { - slot: message.slot, - blockhash: message.blockhash.clone(), - rewards: Some(convert_to::create_rewards_obj( - message.rewards.as_slice(), - message.num_partitions, - )), - block_time: message.block_time.map(convert_to::create_timestamp), - block_height: message.block_height.map(convert_to::create_block_height), - parent_slot: message.parent_slot, - parent_blockhash: message.parent_blockhash.clone(), - executed_transaction_count: message.executed_transaction_count, - entries_count: message.entries_count, - }), - } - } -} - #[derive(Debug)] struct BlockhashStatus { slot: u64, @@ -514,7 +67,7 @@ impl BlockhashStatus { #[derive(Debug, Default)] struct BlockMetaStorageInner { - blocks: HashMap, + blocks: HashMap>, blockhashes: HashMap, processed: Option, confirmed: Option, @@ -669,11 +222,11 @@ impl BlockMetaStorage { #[derive(Debug, Default)] struct SlotMessages { - messages: Vec>>, // Option is used for accounts with low write_version - block_meta: Option, - transactions: Vec, + messages: Vec>, // Option is used for accounts with low write_version + block_meta: Option>, + transactions: Vec>, accounts_dedup: HashMap, // (write_version, message_index) - entries: Vec, + entries: Vec>, sealed: bool, entries_count: usize, confirmed_at: Option, @@ -684,7 +237,7 @@ struct SlotMessages { } impl SlotMessages { - pub fn try_seal(&mut self) -> Option> { + pub fn try_seal(&mut self) -> Option { if !self.sealed { if let Some(block_meta) = &self.block_meta { let executed_transaction_count = block_meta.executed_transaction_count as usize; @@ -703,15 +256,15 @@ impl SlotMessages { let mut accounts = Vec::with_capacity(self.messages.len()); for item in self.messages.iter().flatten() { - if let Message::Account(account) = item.as_ref() { - accounts.push(account.account.clone()); + if let Message::Account(account) = item { + accounts.push(Arc::clone(&account.account)); } } - let message = Arc::new(Message::Block( - (block_meta.clone(), transactions, accounts, entries).into(), + let message = Message::Block(Arc::new( + (Arc::clone(block_meta), transactions, accounts, entries).into(), )); - self.messages.push(Some(Arc::clone(&message))); + self.messages.push(Some(message.clone())); self.sealed = true; self.entries_count = entries_count; @@ -732,7 +285,7 @@ pub struct GrpcService { blocks_meta: Option, subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, + broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, debug_clients_tx: Option>, } @@ -745,7 +298,7 @@ impl GrpcService { is_reload: bool, ) -> anyhow::Result<( Option>>, - mpsc::UnboundedSender>, + mpsc::UnboundedSender, Arc, )> { // Bind service address @@ -857,9 +410,9 @@ impl GrpcService { } async fn geyser_loop( - mut messages_rx: mpsc::UnboundedReceiver>, + mut messages_rx: mpsc::UnboundedReceiver, blocks_meta_tx: Option>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, + broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, block_fail_action: ConfigBlockFailAction, ) { const PROCESSED_MESSAGES_MAX: usize = 31; @@ -877,19 +430,19 @@ impl GrpcService { metrics::message_queue_size_dec(); // Update metrics - if let Message::Slot(slot_message) = message.as_ref() { + if let Message::Slot(slot_message) = &message { metrics::update_slot_plugin_status(slot_message.status, slot_message.slot); } // Update blocks info if let Some(blocks_meta_tx) = &blocks_meta_tx { - if matches!(message.as_ref(), Message::Slot(_) | Message::BlockMeta(_)) { - let _ = blocks_meta_tx.send(message.as_ref().clone()); + if matches!(&message, Message::Slot(_) | Message::BlockMeta(_)) { + let _ = blocks_meta_tx.send(message.clone()); } } // Remove outdated block reconstruction info - match message.as_ref() { + match &message { // On startup we can receive few Confirmed/Finalized slots without BlockMeta message // With saved first Processed slot we can ignore errors caused by startup process Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { @@ -950,7 +503,7 @@ impl GrpcService { // Update block reconstruction info let slot_messages = messages.entry(message.get_slot()).or_default(); - if let Message::Slot(msg) = message.as_ref() { + if let Message::Slot(msg) = &message { match msg.status { CommitmentLevel::Processed => { slot_messages.parent_slot = msg.parent; @@ -963,11 +516,11 @@ impl GrpcService { }, } } - if !matches!(message.as_ref(), Message::Slot(_)) { - slot_messages.messages.push(Some(Arc::clone(&message))); + if !matches!(&message, Message::Slot(_)) { + slot_messages.messages.push(Some(message.clone())); // If we already build Block message, new message will be a problem - if slot_messages.sealed && !(matches!(message.as_ref(), Message::Entry(_)) && slot_messages.entries_count == 0) { + if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) { metrics::update_invalid_blocks(format!("unexpected message {}", message.kind())); match block_fail_action { ConfigBlockFailAction::Log => { @@ -980,7 +533,7 @@ impl GrpcService { } } let mut sealed_block_msg = None; - match message.as_ref() { + match &message { Message::BlockMeta(msg) => { if slot_messages.block_meta.is_some() { metrics::update_invalid_blocks("unexpected message: BlockMeta (duplicate)"); @@ -993,11 +546,11 @@ impl GrpcService { } } } - slot_messages.block_meta = Some(msg.clone()); + slot_messages.block_meta = Some(Arc::clone(msg)); sealed_block_msg = slot_messages.try_seal(); } Message::Transaction(msg) => { - slot_messages.transactions.push(msg.transaction.clone()); + slot_messages.transactions.push(Arc::clone(&msg.transaction)); sealed_block_msg = slot_messages.try_seal(); } // Dedup accounts by max write_version @@ -1015,7 +568,7 @@ impl GrpcService { } } Message::Entry(msg) => { - slot_messages.entries.push(msg.clone()); + slot_messages.entries.push(Arc::clone(msg)); sealed_block_msg = slot_messages.try_seal(); } _ => {} @@ -1026,7 +579,7 @@ impl GrpcService { if let Some(sealed_block_msg) = sealed_block_msg { messages_vec.push(sealed_block_msg); } - let slot_status = if let Message::Slot(msg) = message.as_ref() { + let slot_status = if let Message::Slot(msg) = &message { Some((msg.slot, msg.status)) } else { None @@ -1052,18 +605,18 @@ impl GrpcService { } slots.push(parent); - messages_vec.push(Arc::new(Message::Slot(MessageSlot { + messages_vec.push(Message::Slot(MessageSlot { slot: parent, parent: entry.parent_slot, status, - }))); + })); metrics::missed_status_message_inc(status); } } } for message in messages_vec.into_iter().rev() { - if let Message::Slot(slot) = message.as_ref() { + if let Message::Slot(slot) = &message { let (mut confirmed_messages, mut finalized_messages) = match slot.status { CommitmentLevel::Processed => { (Vec::with_capacity(1), Vec::with_capacity(1)) @@ -1097,7 +650,7 @@ impl GrpcService { }; // processed - processed_messages.push(Arc::clone(&message)); + processed_messages.push(message.clone()); let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); @@ -1106,7 +659,7 @@ impl GrpcService { .reset(Instant::now() + PROCESSED_MESSAGES_SLEEP); // confirmed - confirmed_messages.push(Arc::clone(&message)); + confirmed_messages.push(message.clone()); let _ = broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into())); @@ -1117,7 +670,7 @@ impl GrpcService { } else { let mut confirmed_messages = vec![]; let mut finalized_messages = vec![]; - if matches!(message.as_ref(), Message::Block(_)) { + if matches!(&message, Message::Block(_)) { if let Some(slot_messages) = messages.get(&message.get_slot()) { if let Some(confirmed_at) = slot_messages.confirmed_at { confirmed_messages.extend( @@ -1177,7 +730,7 @@ impl GrpcService { stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, - mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, + mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, debug_client_tx: Option>, drop_client: impl FnOnce(), ) { @@ -1299,7 +852,7 @@ impl GrpcService { if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() { for message in messages.iter() { - if let Message::Slot(slot_message) = message.as_ref() { + if let Message::Slot(slot_message) = &message { DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateSlot { id, slot: slot_message.slot }); } } diff --git a/yellowstone-grpc-geyser/src/lib.rs b/yellowstone-grpc-geyser/src/lib.rs index 3d68ba9b..c043be3c 100644 --- a/yellowstone-grpc-geyser/src/lib.rs +++ b/yellowstone-grpc-geyser/src/lib.rs @@ -1,14 +1,15 @@ pub mod config; pub mod filters; pub mod grpc; +pub mod message; pub mod metrics; pub mod plugin; pub mod version; pub fn get_thread_name() -> String { - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::atomic::{AtomicU64, Ordering}; - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + static ATOMIC_ID: AtomicU64 = AtomicU64::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed); format!("solGeyserGrpc{id:02}") } diff --git a/yellowstone-grpc-geyser/src/message.rs b/yellowstone-grpc-geyser/src/message.rs new file mode 100644 index 00000000..6438a2cb --- /dev/null +++ b/yellowstone-grpc-geyser/src/message.rs @@ -0,0 +1,228 @@ +use { + agave_geyser_plugin_interface::geyser_plugin_interface::{ + ReplicaAccountInfoV3, ReplicaBlockInfoV4, ReplicaEntryInfoV2, ReplicaTransactionInfoV2, + SlotStatus, + }, + solana_sdk::{ + clock::UnixTimestamp, hash::HASH_BYTES, pubkey::Pubkey, signature::Signature, + transaction::SanitizedTransaction, + }, + solana_transaction_status::{Reward, TransactionStatusMeta}, + std::sync::Arc, + yellowstone_grpc_proto::prelude::CommitmentLevel, +}; + +#[derive(Debug, Clone)] +pub struct MessageAccountInfo { + pub pubkey: Pubkey, + pub lamports: u64, + pub owner: Pubkey, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + pub write_version: u64, + pub txn_signature: Option, +} + +#[derive(Debug, Clone)] +pub struct MessageAccount { + pub account: Arc, + pub slot: u64, + pub is_startup: bool, +} + +impl<'a> From<(&'a ReplicaAccountInfoV3<'a>, u64, bool)> for MessageAccount { + fn from((account, slot, is_startup): (&'a ReplicaAccountInfoV3<'a>, u64, bool)) -> Self { + Self { + account: Arc::new(MessageAccountInfo { + pubkey: Pubkey::try_from(account.pubkey).expect("valid Pubkey"), + lamports: account.lamports, + owner: Pubkey::try_from(account.owner).expect("valid Pubkey"), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: account.data.into(), + write_version: account.write_version, + txn_signature: account.txn.map(|txn| *txn.signature()), + }), + slot, + is_startup, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MessageSlot { + pub slot: u64, + pub parent: Option, + pub status: CommitmentLevel, +} + +impl From<(u64, Option, SlotStatus)> for MessageSlot { + fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { + Self { + slot, + parent, + status: match status { + SlotStatus::Processed => CommitmentLevel::Processed, + SlotStatus::Confirmed => CommitmentLevel::Confirmed, + SlotStatus::Rooted => CommitmentLevel::Finalized, + }, + } + } +} + +#[derive(Debug, Clone)] +pub struct MessageTransactionInfo { + pub signature: Signature, + pub is_vote: bool, + pub transaction: SanitizedTransaction, + pub meta: TransactionStatusMeta, + pub index: usize, +} + +#[derive(Debug, Clone)] +pub struct MessageTransaction { + pub transaction: Arc, + pub slot: u64, +} + +impl<'a> From<(&'a ReplicaTransactionInfoV2<'a>, u64)> for MessageTransaction { + fn from((transaction, slot): (&'a ReplicaTransactionInfoV2<'a>, u64)) -> Self { + Self { + transaction: Arc::new(MessageTransactionInfo { + signature: *transaction.signature, + is_vote: transaction.is_vote, + transaction: transaction.transaction.clone(), + meta: transaction.transaction_status_meta.clone(), + index: transaction.index, + }), + slot, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MessageEntry { + pub slot: u64, + pub index: usize, + pub num_hashes: u64, + pub hash: [u8; HASH_BYTES], + pub executed_transaction_count: u64, + pub starting_transaction_index: u64, +} + +impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { + fn from(entry: &ReplicaEntryInfoV2) -> Self { + Self { + slot: entry.slot, + index: entry.index, + num_hashes: entry.num_hashes, + hash: entry.hash[0..32].try_into().expect("failed to create hash"), + executed_transaction_count: entry.executed_transaction_count, + starting_transaction_index: entry + .starting_transaction_index + .try_into() + .expect("failed convert usize to u64"), + } + } +} + +#[derive(Debug, Clone)] +pub struct MessageBlock { + pub meta: Arc, + pub transactions: Vec>, + pub updated_account_count: u64, + pub accounts: Vec>, + pub entries: Vec>, +} + +impl + From<( + Arc, + Vec>, + Vec>, + Vec>, + )> for MessageBlock +{ + fn from( + (meta, transactions, accounts, entries): ( + Arc, + Vec>, + Vec>, + Vec>, + ), + ) -> Self { + Self { + meta, + transactions, + updated_account_count: accounts.len() as u64, + accounts, + entries, + } + } +} + +#[derive(Debug, Clone)] +pub struct MessageBlockMeta { + pub parent_slot: u64, + pub slot: u64, + pub parent_blockhash: String, + pub blockhash: String, + pub rewards: Vec, + pub num_partitions: Option, + pub block_time: Option, + pub block_height: Option, + pub executed_transaction_count: u64, + pub entries_count: u64, +} + +impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { + fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { + Self { + parent_slot: blockinfo.parent_slot, + slot: blockinfo.slot, + parent_blockhash: blockinfo.parent_blockhash.to_string(), + blockhash: blockinfo.blockhash.to_string(), + rewards: blockinfo.rewards.rewards.clone(), + num_partitions: blockinfo.rewards.num_partitions, + block_time: blockinfo.block_time, + block_height: blockinfo.block_height, + executed_transaction_count: blockinfo.executed_transaction_count, + entries_count: blockinfo.entry_count, + } + } +} + +#[derive(Debug, Clone)] +pub enum Message { + Slot(MessageSlot), + Account(MessageAccount), + Transaction(MessageTransaction), + Entry(Arc), + Block(Arc), + BlockMeta(Arc), +} + +impl Message { + pub fn get_slot(&self) -> u64 { + match self { + Self::Slot(msg) => msg.slot, + Self::Account(msg) => msg.slot, + Self::Transaction(msg) => msg.slot, + Self::Entry(msg) => msg.slot, + Self::Block(msg) => msg.meta.slot, + Self::BlockMeta(msg) => msg.slot, + } + } + + pub const fn kind(&self) -> &'static str { + match self { + Self::Slot(_) => "Slot", + Self::Account(_) => "Account", + Self::Transaction(_) => "Transaction", + Self::Entry(_) => "Entry", + Self::Block(_) => "Block", + Self::BlockMeta(_) => "BlockMeta", + } + } +} diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 7422302a..59f76ea2 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -1,7 +1,8 @@ use { crate::{ config::Config, - grpc::{GrpcService, Message}, + grpc::GrpcService, + message::Message, metrics::{self, PrometheusService}, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ @@ -28,14 +29,14 @@ pub struct PluginInner { runtime: Runtime, snapshot_channel: Mutex>>>, snapshot_channel_closed: AtomicBool, - grpc_channel: mpsc::UnboundedSender>, + grpc_channel: mpsc::UnboundedSender, grpc_shutdown: Arc, prometheus: PrometheusService, } impl PluginInner { fn send_message(&self, message: Message) { - if self.grpc_channel.send(Arc::new(message)).is_ok() { + if self.grpc_channel.send(message).is_ok() { metrics::message_queue_size_inc(); } } @@ -211,7 +212,7 @@ impl GeyserPlugin for Plugin { ReplicaEntryInfoVersions::V0_0_2(entry) => entry, }; - let message = Message::Entry(entry.into()); + let message = Message::Entry(Arc::new(entry.into())); inner.send_message(message); Ok(()) @@ -233,7 +234,7 @@ impl GeyserPlugin for Plugin { ReplicaBlockInfoVersions::V0_0_4(info) => info, }; - let message = Message::BlockMeta(blockinfo.into()); + let message = Message::BlockMeta(Arc::new(blockinfo.into())); inner.send_message(message); Ok(())