From b4797bc048a8ae04d75b10f36fd914689fd2b0b6 Mon Sep 17 00:00:00 2001 From: Satyam Kulkarni Date: Fri, 7 Apr 2023 13:33:41 +0530 Subject: [PATCH 1/2] feat: implement get_filter_changes --- client/src/client.rs | 4 +++ client/src/node.rs | 5 ++++ client/src/rpc.rs | 7 ++++++ execution/src/execution.rs | 47 +++++++++++++++++++++++++++++++++++ execution/src/rpc/http_rpc.rs | 10 ++++++++ execution/src/rpc/mock_rpc.rs | 7 +++++- execution/src/rpc/mod.rs | 3 ++- 7 files changed, 81 insertions(+), 2 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index a2e7f6a4..318b94df 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -567,4 +567,8 @@ impl Client { pub async fn get_coinbase(&self) -> Result
{ self.node.read().await.get_coinbase() } + + pub async fn get_filter_changes(&self, filter_id: U256) -> Result> { + self.node.read().await.get_filter_changes(filter_id).await + } } diff --git a/client/src/node.rs b/client/src/node.rs index 75c299d8..f98aa051 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -264,6 +264,11 @@ impl Node { self.execution.get_logs(filter, &self.payloads).await } + + pub async fn get_filter_changes(&self, filter_id: U256) -> Result> { + self.execution.get_filter_changes(filter_id, &self.payloads).await + } + // assumes tip of 1 gwei to prevent having to prove out every tx in the block pub fn get_gas_price(&self) -> Result { self.check_head_age()?; diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 26037150..1c4defd9 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -116,6 +116,8 @@ trait EthRpc { async fn get_coinbase(&self) -> Result; #[method(name = "syncing")] async fn syncing(&self) -> Result; + #[method(name = "getFilterChanges")] + async fn get_filter_changes(&self, filter_id: U256) -> Result, Error>; } #[rpc(client, server, namespace = "net")] @@ -302,6 +304,11 @@ impl EthRpcServer for RpcInner { Ok(format_hex(&storage)) } + + async fn get_filter_changes(&self, filter_id: U256) -> Result, Error> { + let node = self.node.read().await; + convert_err(node.get_filter_changes(filter_id).await) + } } #[async_trait] diff --git a/execution/src/execution.rs b/execution/src/execution.rs index 38768d94..739523bb 100644 --- a/execution/src/execution.rs +++ b/execution/src/execution.rs @@ -346,6 +346,53 @@ impl ExecutionClient { Ok(logs) } + pub async fn get_filter_changes( + &self, + filter_id: U256, + payloads: &BTreeMap, + ) -> Result> { + let filter_id = filter_id.clone(); + + let logs = self.rpc.get_filter_changes(filter_id).await?; + if logs.len() > MAX_SUPPORTED_LOGS_NUMBER { + return Err( + ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(), + ); + } + + for (_pos, log) in logs.iter().enumerate() { + // For every log + // Get the hash of the tx that generated it + let tx_hash = log + .transaction_hash + .ok_or(eyre::eyre!("tx hash not found in log"))?; + // Get its proven receipt + let receipt = self + .get_transaction_receipt(&tx_hash, payloads) + .await? + .ok_or(ExecutionError::NoReceiptForTransaction(tx_hash.to_string()))?; + + // Check if the receipt contains the desired log + // Encoding logs for comparison + let receipt_logs_encoded = receipt + .logs + .iter() + .map(|log| log.rlp_bytes()) + .collect::>(); + + let log_encoded = log.rlp_bytes(); + + if !receipt_logs_encoded.contains(&log_encoded) { + return Err(ExecutionError::MissingLog( + tx_hash.to_string(), + log.log_index.unwrap(), + ) + .into()); + } + } + Ok(logs) + } + pub async fn get_fee_history( &self, block_count: u64, diff --git a/execution/src/rpc/http_rpc.rs b/execution/src/rpc/http_rpc.rs index 9c0f9b2f..82ee83b4 100644 --- a/execution/src/rpc/http_rpc.rs +++ b/execution/src/rpc/http_rpc.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::str::FromStr; use async_trait::async_trait; @@ -10,6 +11,8 @@ use ethers::types::{ Filter, Log, Transaction, TransactionReceipt, H256, U256, }; use eyre::Result; +use serde::Deserialize; +use serde::{de::DeserializeOwned, Serialize}; use crate::types::CallOpts; use common::errors::RpcError; @@ -154,4 +157,11 @@ impl ExecutionRpc for HttpRpc { .await .map_err(|e| RpcError::new("fee_history", e))?) } + + async fn get_filter_changes(&self, filter_id: U256) -> Result>{ + Ok(self + .provider.get_filter_changes(filter_id) + .await + .map_err(|e| RpcError::new("get_filter_changes", e))?) + } } diff --git a/execution/src/rpc/mock_rpc.rs b/execution/src/rpc/mock_rpc.rs index 42bec5ef..202df53f 100644 --- a/execution/src/rpc/mock_rpc.rs +++ b/execution/src/rpc/mock_rpc.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use common::utils::hex_str_to_bytes; use ethers::types::{ transaction::eip2930::AccessList, Address, EIP1186ProofResponse, FeeHistory, Filter, Log, - Transaction, TransactionReceipt, H256, + Transaction, TransactionReceipt, H256, U256, }; use eyre::{eyre, Result}; @@ -76,4 +76,9 @@ impl ExecutionRpc for MockRpc { let fee_history = read_to_string(self.path.join("fee_history.json"))?; Ok(serde_json::from_str(&fee_history)?) } + + async fn get_filter_changes(&self, filter_id: U256) -> Result> { + let logs = read_to_string(self.path.join("logs.json"))?; + Ok(serde_json::from_str(&logs)?) + } } diff --git a/execution/src/rpc/mod.rs b/execution/src/rpc/mod.rs index d4dd8a58..476f97a6 100644 --- a/execution/src/rpc/mod.rs +++ b/execution/src/rpc/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use ethers::types::{ transaction::eip2930::AccessList, Address, EIP1186ProofResponse, FeeHistory, Filter, Log, - Transaction, TransactionReceipt, H256, + Transaction, TransactionReceipt, H256, U256, }; use eyre::Result; @@ -37,4 +37,5 @@ pub trait ExecutionRpc: Send + Clone + Sync + 'static { last_block: u64, reward_percentiles: &[f64], ) -> Result; + async fn get_filter_changes(&self, filter_id: U256) -> Result>; } From 6d24e7276461bb1a037d9fc3e6075c216a6fac8e Mon Sep 17 00:00:00 2001 From: Satyam Kulkarni Date: Fri, 5 May 2023 05:10:02 +0530 Subject: [PATCH 2/2] fix: updated problem --- client/src/client.rs | 4 +- client/src/node.rs | 4 +- client/src/rpc.rs | 6 +-- config/src/types.rs | 2 +- execution/src/execution.rs | 87 ++++++++++++++++++++--------------- execution/src/rpc/http_rpc.rs | 38 +++++++++++---- execution/src/rpc/mock_rpc.rs | 7 +-- execution/src/rpc/mod.rs | 7 ++- execution/src/types.rs | 8 +++- 9 files changed, 101 insertions(+), 62 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index 318b94df..044452dd 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -11,7 +11,7 @@ use eyre::{eyre, Result}; use common::types::BlockTag; use config::{CheckpointFallback, Config}; use consensus::{types::Header, ConsensusClient}; -use execution::types::{CallOpts, ExecutionBlock}; +use execution::types::{CallOpts, ExecutionBlock, FilterChangesReturnType}; use log::{error, info, warn}; use tokio::sync::RwLock; @@ -568,7 +568,7 @@ impl Client { self.node.read().await.get_coinbase() } - pub async fn get_filter_changes(&self, filter_id: U256) -> Result> { + pub async fn get_filter_changes(&self, filter_id: U256) -> Result { self.node.read().await.get_filter_changes(filter_id).await } } diff --git a/client/src/node.rs b/client/src/node.rs index f98aa051..b1ed00c1 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -17,7 +17,7 @@ use consensus::types::{ExecutionPayload, Header}; use consensus::ConsensusClient; use execution::evm::Evm; use execution::rpc::http_rpc::HttpRpc; -use execution::types::{CallOpts, ExecutionBlock}; +use execution::types::{CallOpts, ExecutionBlock, FilterChangesReturnType}; use execution::ExecutionClient; use crate::errors::NodeError; @@ -265,7 +265,7 @@ impl Node { } - pub async fn get_filter_changes(&self, filter_id: U256) -> Result> { + pub async fn get_filter_changes(&self, filter_id: U256) -> Result { self.execution.get_filter_changes(filter_id, &self.payloads).await } diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 1c4defd9..399d0d5f 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -19,7 +19,7 @@ use common::{ types::BlockTag, utils::{hex_str_to_bytes, u64_to_hex_string}, }; -use execution::types::{CallOpts, ExecutionBlock}; +use execution::types::{CallOpts, ExecutionBlock, FilterChangesReturnType}; pub struct Rpc { node: Arc>, @@ -117,7 +117,7 @@ trait EthRpc { #[method(name = "syncing")] async fn syncing(&self) -> Result; #[method(name = "getFilterChanges")] - async fn get_filter_changes(&self, filter_id: U256) -> Result, Error>; + async fn get_filter_changes(&self, filter_id: U256) -> Result; } #[rpc(client, server, namespace = "net")] @@ -305,7 +305,7 @@ impl EthRpcServer for RpcInner { Ok(format_hex(&storage)) } - async fn get_filter_changes(&self, filter_id: U256) -> Result, Error> { + async fn get_filter_changes(&self, filter_id: U256) -> Result { let node = self.node.read().await; convert_err(node.get_filter_changes(filter_id).await) } diff --git a/config/src/types.rs b/config/src/types.rs index aca1d28e..5405b161 100644 --- a/config/src/types.rs +++ b/config/src/types.rs @@ -29,4 +29,4 @@ pub struct Fork { serialize_with = "bytes_serialize" )] pub fork_version: Vec, -} +} \ No newline at end of file diff --git a/execution/src/execution.rs b/execution/src/execution.rs index 739523bb..7a3203a7 100644 --- a/execution/src/execution.rs +++ b/execution/src/execution.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, HashMap}; +use std::fmt::Debug; use std::str::FromStr; use ethers::abi::AbiEncode; @@ -12,10 +13,12 @@ use common::utils::hex_str_to_bytes; use consensus::types::ExecutionPayload; use futures::future::join_all; use revm::KECCAK_EMPTY; +use serde::Serialize; +use serde::de::DeserializeOwned; use triehash_ethereum::ordered_trie_root; use crate::errors::ExecutionError; -use crate::types::Transactions; +use crate::types::{Transactions, FilterChangesReturnType}; use super::proof::{encode_account, verify_proof}; use super::rpc::ExecutionRpc; @@ -350,47 +353,55 @@ impl ExecutionClient { &self, filter_id: U256, payloads: &BTreeMap, - ) -> Result> { + ) -> Result{ let filter_id = filter_id.clone(); - let logs = self.rpc.get_filter_changes(filter_id).await?; - if logs.len() > MAX_SUPPORTED_LOGS_NUMBER { - return Err( - ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(), - ); - } - - for (_pos, log) in logs.iter().enumerate() { - // For every log - // Get the hash of the tx that generated it - let tx_hash = log - .transaction_hash - .ok_or(eyre::eyre!("tx hash not found in log"))?; - // Get its proven receipt - let receipt = self - .get_transaction_receipt(&tx_hash, payloads) - .await? - .ok_or(ExecutionError::NoReceiptForTransaction(tx_hash.to_string()))?; - - // Check if the receipt contains the desired log - // Encoding logs for comparison - let receipt_logs_encoded = receipt - .logs - .iter() - .map(|log| log.rlp_bytes()) - .collect::>(); - - let log_encoded = log.rlp_bytes(); - - if !receipt_logs_encoded.contains(&log_encoded) { - return Err(ExecutionError::MissingLog( - tx_hash.to_string(), - log.log_index.unwrap(), - ) - .into()); + let filter_return = self.rpc.get_filter_changes(filter_id).await?; + + match filter_return { + FilterChangesReturnType::Log(logs) => { + if logs.len() > MAX_SUPPORTED_LOGS_NUMBER { + return Err( + ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(), + ); + } + + for (_pos, log) in logs.iter().enumerate() { + // For every log + // Get the hash of the tx that generated it + let tx_hash = log + .transaction_hash + .ok_or(eyre::eyre!("tx hash not found in log"))?; + // Get its proven receipt + let receipt = self + .get_transaction_receipt(&tx_hash, payloads) + .await? + .ok_or(ExecutionError::NoReceiptForTransaction(tx_hash.to_string()))?; + + // Check if the receipt contains the desired log + // Encoding logs for comparison + let receipt_logs_encoded = receipt + .logs + .iter() + .map(|log| log.rlp_bytes()) + .collect::>(); + + let log_encoded = log.rlp_bytes(); + + if !receipt_logs_encoded.contains(&log_encoded) { + return Err(ExecutionError::MissingLog( + tx_hash.to_string(), + log.log_index.unwrap(), + ) + .into()); + } + } + Ok(FilterChangesReturnType::Log(logs)) + }, + FilterChangesReturnType::H256(h256s) => { + Ok(FilterChangesReturnType::H256(h256s)) } } - Ok(logs) } pub async fn get_fee_history( diff --git a/execution/src/rpc/http_rpc.rs b/execution/src/rpc/http_rpc.rs index 82ee83b4..c1934473 100644 --- a/execution/src/rpc/http_rpc.rs +++ b/execution/src/rpc/http_rpc.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::any::{Any, TypeId}; use std::str::FromStr; use async_trait::async_trait; @@ -11,10 +11,8 @@ use ethers::types::{ Filter, Log, Transaction, TransactionReceipt, H256, U256, }; use eyre::Result; -use serde::Deserialize; -use serde::{de::DeserializeOwned, Serialize}; -use crate::types::CallOpts; +use crate::types::{CallOpts, FilterChangesReturnType}; use common::errors::RpcError; use super::ExecutionRpc; @@ -30,6 +28,20 @@ impl Clone for HttpRpc { } } + +// test function, will remove before commiting it for production +fn convert_to_enum(value: &dyn std::any::Any) -> Option { + if let Some(logs) = value.downcast_ref::>() { + Some(FilterChangesReturnType::Log(logs.clone())) + } else if let Some(hashes) = value.downcast_ref::>() { + Some(FilterChangesReturnType::H256(hashes.clone())) + } else { + None + } + +} + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ExecutionRpc for HttpRpc { @@ -158,10 +170,16 @@ impl ExecutionRpc for HttpRpc { .map_err(|e| RpcError::new("fee_history", e))?) } - async fn get_filter_changes(&self, filter_id: U256) -> Result>{ - Ok(self - .provider.get_filter_changes(filter_id) - .await - .map_err(|e| RpcError::new("get_filter_changes", e))?) + async fn get_filter_changes(&self, filter_id: U256) -> Result { + let x = self + .provider.get_filter_changes(filter_id) + .await + .map_err(|e| RpcError::new("get_filter_changes", e)).unwrap(); + + // let y: FilterChangesReturnType = convert_to_enum(&x).unwrap(); + + // how to determine between log and h256?? + + Ok(FilterChangesReturnType::Log(x)) } -} +} \ No newline at end of file diff --git a/execution/src/rpc/mock_rpc.rs b/execution/src/rpc/mock_rpc.rs index 202df53f..3a92ea43 100644 --- a/execution/src/rpc/mock_rpc.rs +++ b/execution/src/rpc/mock_rpc.rs @@ -1,4 +1,4 @@ -use std::{fs::read_to_string, path::PathBuf}; +use std::{fs::read_to_string, path::PathBuf, fmt::Debug}; use async_trait::async_trait; use common::utils::hex_str_to_bytes; @@ -7,8 +7,9 @@ use ethers::types::{ Transaction, TransactionReceipt, H256, U256, }; use eyre::{eyre, Result}; +use serde::{de::DeserializeOwned, Serialize}; -use crate::types::CallOpts; +use crate::types::{CallOpts, FilterChangesReturnType}; use super::ExecutionRpc; @@ -77,7 +78,7 @@ impl ExecutionRpc for MockRpc { Ok(serde_json::from_str(&fee_history)?) } - async fn get_filter_changes(&self, filter_id: U256) -> Result> { + async fn get_filter_changes(&self, filter_id: U256) -> Result { let logs = read_to_string(self.path.join("logs.json"))?; Ok(serde_json::from_str(&logs)?) } diff --git a/execution/src/rpc/mod.rs b/execution/src/rpc/mod.rs index 476f97a6..7dcffd93 100644 --- a/execution/src/rpc/mod.rs +++ b/execution/src/rpc/mod.rs @@ -1,11 +1,14 @@ +use std::fmt::Debug; + use async_trait::async_trait; use ethers::types::{ transaction::eip2930::AccessList, Address, EIP1186ProofResponse, FeeHistory, Filter, Log, Transaction, TransactionReceipt, H256, U256, }; use eyre::Result; +use serde::{de::DeserializeOwned, Serialize}; -use crate::types::CallOpts; +use crate::types::{CallOpts, FilterChangesReturnType}; pub mod http_rpc; pub mod mock_rpc; @@ -37,5 +40,5 @@ pub trait ExecutionRpc: Send + Clone + Sync + 'static { last_block: u64, reward_percentiles: &[f64], ) -> Result; - async fn get_filter_changes(&self, filter_id: U256) -> Result>; + async fn get_filter_changes(&self, filter_id: U256) -> Result; } diff --git a/execution/src/types.rs b/execution/src/types.rs index eea31e4e..0e09a89c 100644 --- a/execution/src/types.rs +++ b/execution/src/types.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, fmt}; use ethers::{ prelude::{Address, H256, U256}, - types::Transaction, + types::{Transaction, Log}, }; use eyre::Result; use serde::{ser::SerializeSeq, Deserialize, Serialize}; @@ -19,6 +19,12 @@ pub struct Account { pub slots: HashMap, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum FilterChangesReturnType { + Log(Vec), + H256(Vec) +} + #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct ExecutionBlock {