From 6283315551c795f33a6f09706691f13321e0cefc Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 10 Dec 2023 11:29:39 -0500 Subject: [PATCH] proto: add more convert functions (#264) (cherry picked from commit bc235ed298c7ace3c0397cb4775d8343262930d4) --- CHANGELOG.md | 2 + yellowstone-grpc-client/src/lib.rs | 39 ++++-- yellowstone-grpc-geyser/src/grpc.rs | 4 +- yellowstone-grpc-proto/src/lib.rs | 205 ++++++++++++++++------------ 4 files changed, 147 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ebf16df3..1b51082f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +- proto: add more convert functions ([#264](https://github.com/rpcpool/yellowstone-grpc/pull/264)) + ### Breaking ## 2023-12-08 diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index fdebe71a..5afe591d 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -28,11 +28,11 @@ use { }; #[derive(Debug, Clone)] -struct InterceptorFn { +pub struct InterceptorXToken { x_token: Option, } -impl Interceptor for InterceptorFn { +impl Interceptor for InterceptorXToken { fn call(&mut self, mut request: Request<()>) -> Result, Status> { if let Some(x_token) = self.x_token.clone() { request.metadata_mut().insert("x-token", x_token); @@ -65,11 +65,15 @@ pub struct GeyserGrpcClient { } impl GeyserGrpcClient<()> { + pub const fn max_decoding_message_size() -> usize { + 64 * 1024 * 1024 // 64 MiB + } + fn connect2( endpoint: E, tls_config: Option, x_token: Option, - ) -> GeyserGrpcClientResult<(Endpoint, InterceptorFn)> + ) -> GeyserGrpcClientResult<(Endpoint, InterceptorXToken)> where E: Into, T: TryInto, @@ -91,7 +95,7 @@ impl GeyserGrpcClient<()> { } _ => {} } - let interceptor = InterceptorFn { x_token }; + let interceptor = InterceptorXToken { x_token }; Ok((endpoint, interceptor)) } @@ -107,11 +111,11 @@ impl GeyserGrpcClient<()> { { let (endpoint, interceptor) = Self::connect2(endpoint, tls_config, x_token)?; let channel = endpoint.connect_lazy(); - Ok(GeyserGrpcClient { - health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()), - geyser: GeyserClient::with_interceptor(channel, interceptor) - .max_decoding_message_size(64 * 1024 * 1024), // 64 MiB - }) + Ok(GeyserGrpcClient::new( + HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(Self::max_decoding_message_size()), + )) } pub async fn connect_with_timeout( @@ -140,15 +144,22 @@ impl GeyserGrpcClient<()> { endpoint.connect().await? }; - Ok(GeyserGrpcClient { - health: HealthClient::with_interceptor(channel.clone(), interceptor.clone()), - geyser: GeyserClient::with_interceptor(channel, interceptor) - .max_decoding_message_size(64 * 1024 * 1024), // 64 MiB - }) + Ok(GeyserGrpcClient::new( + HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(Self::max_decoding_message_size()), + )) } } impl GeyserGrpcClient { + pub fn new( + health: HealthClient>, + geyser: GeyserClient>, + ) -> Self { + Self { health, geyser } + } + pub async fn health_check(&mut self) -> GeyserGrpcClientResult { let request = HealthCheckRequest { service: "geyser.Geyser".to_owned(), diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 497805d1..c9276fb4 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -416,7 +416,7 @@ impl<'a> MessageRef<'a> { Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { slot: message.slot, blockhash: message.blockhash.clone(), - rewards: Some(convert_to::create_rewards(message.rewards.as_slice())), + rewards: Some(convert_to::create_rewards_obj(message.rewards.as_slice())), block_time: message.block_time.map(convert_to::create_timestamp), block_height: message.block_height.map(convert_to::create_block_height), parent_slot: message.parent_slot, @@ -443,7 +443,7 @@ impl<'a> MessageRef<'a> { Self::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta { slot: message.slot, blockhash: message.blockhash.clone(), - rewards: Some(convert_to::create_rewards(message.rewards.as_slice())), + rewards: Some(convert_to::create_rewards_obj(message.rewards.as_slice())), block_time: message.block_time.map(convert_to::create_timestamp), block_height: message.block_height.map(convert_to::create_block_height), parent_slot: message.parent_slot, diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 8e3ead21..f4d5bb60 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -56,11 +56,7 @@ pub mod convert_to { match message { SanitizedMessage::Legacy(LegacyMessage { message, .. }) => proto::Message { header: Some(create_header(&message.header)), - account_keys: message - .account_keys - .iter() - .map(|key| >::as_ref(key).into()) - .collect(), + account_keys: create_pubkeys(&message.account_keys), recent_blockhash: message.recent_blockhash.to_bytes().into(), instructions: message .instructions @@ -72,23 +68,11 @@ pub mod convert_to { }, SanitizedMessage::V0(LoadedMessage { message, .. }) => proto::Message { header: Some(create_header(&message.header)), - account_keys: message - .account_keys - .iter() - .map(|key| >::as_ref(key).into()) - .collect(), + account_keys: create_pubkeys(&message.account_keys), recent_blockhash: message.recent_blockhash.to_bytes().into(), - instructions: message - .instructions - .iter() - .map(create_instruction) - .collect(), + instructions: create_instructions(&message.instructions), versioned: true, - address_table_lookups: message - .address_table_lookups - .iter() - .map(create_lookup) - .collect(), + address_table_lookups: create_lookups(&message.address_table_lookups), }, } } @@ -101,6 +85,17 @@ pub mod convert_to { } } + pub fn create_pubkeys(pubkeys: &[Pubkey]) -> Vec> { + pubkeys + .iter() + .map(|key| >::as_ref(key).into()) + .collect() + } + + pub fn create_instructions(ixs: &[CompiledInstruction]) -> Vec { + ixs.iter().map(create_instruction).collect() + } + pub fn create_instruction(ix: &CompiledInstruction) -> proto::CompiledInstruction { proto::CompiledInstruction { program_id_index: ix.program_id_index as u32, @@ -109,6 +104,12 @@ pub mod convert_to { } } + pub fn create_lookups( + lookups: &[MessageAddressTableLookup], + ) -> Vec { + lookups.iter().map(create_lookup).collect() + } + pub fn create_lookup(lookup: &MessageAddressTableLookup) -> proto::MessageAddressTableLookup { proto::MessageAddressTableLookup { account_key: >::as_ref(&lookup.account_key).into(), @@ -140,33 +141,22 @@ pub mod convert_to { }; let inner_instructions_none = inner_instructions.is_none(); let inner_instructions = inner_instructions - .as_ref() - .map(|v| v.iter().map(create_inner_instructions).collect()) + .as_deref() + .map(create_inner_instructions_vec) .unwrap_or_default(); let log_messages_none = log_messages.is_none(); let log_messages = log_messages.clone().unwrap_or_default(); let pre_token_balances = pre_token_balances - .as_ref() - .map(|v| v.iter().map(create_token_balance).collect()) + .as_deref() + .map(create_token_balances) .unwrap_or_default(); let post_token_balances = post_token_balances - .as_ref() - .map(|v| v.iter().map(create_token_balance).collect()) - .unwrap_or_default(); - let rewards = rewards - .as_ref() - .map(|vec| vec.iter().map(create_reward).collect()) + .as_deref() + .map(create_token_balances) .unwrap_or_default(); - let loaded_writable_addresses = loaded_addresses - .writable - .iter() - .map(|key| >::as_ref(key).into()) - .collect(); - let loaded_readonly_addresses = loaded_addresses - .readonly - .iter() - .map(|key| >::as_ref(key).into()) - .collect(); + let rewards = rewards.as_deref().map(create_rewards).unwrap_or_default(); + let loaded_writable_addresses = create_pubkeys(&loaded_addresses.writable); + let loaded_readonly_addresses = create_pubkeys(&loaded_addresses.readonly); proto::TransactionStatusMeta { err, @@ -188,17 +178,23 @@ pub mod convert_to { } } + pub fn create_inner_instructions_vec( + ixs: &[InnerInstructions], + ) -> Vec { + ixs.iter().map(create_inner_instructions).collect() + } + pub fn create_inner_instructions(instructions: &InnerInstructions) -> proto::InnerInstructions { proto::InnerInstructions { index: instructions.index as u32, - instructions: instructions - .instructions - .iter() - .map(create_inner_instruction) - .collect(), + instructions: create_inner_instruction_vec(&instructions.instructions), } } + pub fn create_inner_instruction_vec(ixs: &[InnerInstruction]) -> Vec { + ixs.iter().map(create_inner_instruction).collect() + } + pub fn create_inner_instruction(instruction: &InnerInstruction) -> proto::InnerInstruction { proto::InnerInstruction { program_id_index: instruction.instruction.program_id_index as u32, @@ -208,6 +204,10 @@ pub mod convert_to { } } + pub fn create_token_balances(balances: &[TransactionTokenBalance]) -> Vec { + balances.iter().map(create_token_balance).collect() + } + pub fn create_token_balance(balance: &TransactionTokenBalance) -> proto::TokenBalance { proto::TokenBalance { account_index: balance.account_index as u32, @@ -223,6 +223,16 @@ pub mod convert_to { } } + pub fn create_rewards_obj(rewards: &[Reward]) -> proto::Rewards { + proto::Rewards { + rewards: create_rewards(rewards), + } + } + + pub fn create_rewards(rewards: &[Reward]) -> Vec { + rewards.iter().map(create_reward).collect() + } + pub fn create_reward(reward: &Reward) -> proto::Reward { proto::Reward { pubkey: reward.pubkey.clone(), @@ -246,12 +256,6 @@ pub mod convert_to { } } - pub fn create_rewards(rewards: &[Reward]) -> proto::Rewards { - proto::Rewards { - rewards: rewards.iter().map(create_reward).collect(), - } - } - pub const fn create_block_height(block_height: u64) -> proto::BlockHeight { proto::BlockHeight { block_height } } @@ -266,6 +270,7 @@ pub mod convert_from { super::prelude as proto, solana_account_decoder::parse_token::UiTokenAmount, solana_sdk::{ + account::Account, hash::{Hash, HASH_BYTES}, instruction::CompiledInstruction, message::{ @@ -348,7 +353,7 @@ pub mod convert_from { }) } - fn create_message(message: proto::Message) -> Result { + pub fn create_message(message: proto::Message) -> Result { let header = ensure_some(message.header, "failed to get MessageHeader")?; let header = MessageHeader { num_required_signatures: ensure_some( @@ -369,18 +374,6 @@ pub mod convert_from { return Err("failed to parse hash".to_owned()); } - let mut instructions = Vec::with_capacity(message.instructions.len()); - for ix in message.instructions { - instructions.push(CompiledInstruction { - program_id_index: ensure_some( - ix.program_id_index.try_into().ok(), - "failed to decode CompiledInstruction.program_id_index)", - )?, - accounts: ix.accounts, - data: ix.data, - }); - } - Ok(if message.versioned { let mut address_table_lookups = Vec::with_capacity(message.address_table_lookups.len()); for table in message.address_table_lookups { @@ -398,7 +391,7 @@ pub mod convert_from { header, account_keys: create_pubkey_vec(message.account_keys)?, recent_blockhash: Hash::new(message.recent_blockhash.as_slice()), - instructions, + instructions: create_message_instructions(message.instructions)?, address_table_lookups, }) } else { @@ -406,11 +399,30 @@ pub mod convert_from { header, account_keys: create_pubkey_vec(message.account_keys)?, recent_blockhash: Hash::new(message.recent_blockhash.as_slice()), - instructions, + instructions: create_message_instructions(message.instructions)?, }) }) } + pub fn create_message_instructions( + ixs: Vec, + ) -> Result, String> { + ixs.into_iter().map(create_message_instruction).collect() + } + + pub fn create_message_instruction( + ix: proto::CompiledInstruction, + ) -> Result { + Ok(CompiledInstruction { + program_id_index: ensure_some( + ix.program_id_index.try_into().ok(), + "failed to decode CompiledInstruction.program_id_index)", + )?, + accounts: ix.accounts, + data: ix.data, + }) + } + pub fn create_tx_meta( meta: proto::TransactionStatusMeta, ) -> Result { @@ -418,21 +430,18 @@ pub mod convert_from { Some(err) => Err(err), None => Ok(()), }; - let mut meta_inner_instructions = vec![]; - for ix in meta.inner_instructions { - meta_inner_instructions.push(create_inner_instruction(ix)?); - } - let mut meta_rewards = vec![]; - for reward in meta.rewards { - meta_rewards.push(create_reward(reward)?); - } + let meta_rewards = meta + .rewards + .into_iter() + .map(create_reward) + .collect::, _>>()?; Ok(TransactionStatusMeta { status: meta_status, fee: meta.fee, pre_balances: meta.pre_balances, post_balances: meta.post_balances, - inner_instructions: Some(meta_inner_instructions), + inner_instructions: Some(create_meta_inner_instructions(meta.inner_instructions)?), log_messages: Some(meta.log_messages), pre_token_balances: Some(create_token_balances(meta.pre_token_balances)?), post_token_balances: Some(create_token_balances(meta.post_token_balances)?), @@ -468,7 +477,15 @@ pub mod convert_from { ) } - fn create_inner_instruction(ix: proto::InnerInstructions) -> Result { + pub fn create_meta_inner_instructions( + ixs: Vec, + ) -> Result, String> { + ixs.into_iter().map(create_meta_inner_instruction).collect() + } + + pub fn create_meta_inner_instruction( + ix: proto::InnerInstructions, + ) -> Result { let mut instructions = vec![]; for ix in ix.instructions { instructions.push(InnerInstruction { @@ -518,7 +535,7 @@ pub mod convert_from { }) } - fn create_token_balances( + pub fn create_token_balances( balances: Vec, ) -> Result, String> { let mut vec = Vec::with_capacity(balances.len()); @@ -546,7 +563,7 @@ pub mod convert_from { Ok(vec) } - fn create_loaded_addresses( + pub fn create_loaded_addresses( writable: Vec>, readonly: Vec>, ) -> Result { @@ -556,14 +573,28 @@ pub mod convert_from { }) } - fn create_pubkey_vec(pubkeys: Vec>) -> Result, String> { - let mut vec = Vec::with_capacity(pubkeys.len()); - for pubkey in pubkeys { - vec.push(ensure_some( - Pubkey::try_from(pubkey.as_slice()).ok(), - "failed to parse Pubkey", - )?) - } - Ok(vec) + pub fn create_pubkey_vec(pubkeys: Vec>) -> Result, String> { + pubkeys + .iter() + .map(|pubkey| create_pubkey(pubkey.as_slice())) + .collect() + } + + pub fn create_pubkey(pubkey: &[u8]) -> Result { + ensure_some(Pubkey::try_from(pubkey).ok(), "failed to parse Pubkey") + } + + pub fn create_account( + account: proto::SubscribeUpdateAccountInfo, + ) -> Result<(Pubkey, Account), String> { + let pubkey = create_pubkey(&account.pubkey)?; + let account = Account { + lamports: account.lamports, + data: account.data, + owner: create_pubkey(&account.owner)?, + executable: account.executable, + rent_epoch: account.rent_epoch, + }; + Ok((pubkey, account)) } }