From a20d26ff2f9eb014483e55de7b84f5ec768ba6a5 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 2 Sep 2024 15:54:26 +0530 Subject: [PATCH 1/6] graph, chain/ethereum: use block ptrs when fetching blocks for composed subgraphs --- chain/arweave/src/chain.rs | 2 +- chain/cosmos/src/chain.rs | 6 +- chain/ethereum/src/adapter.rs | 5 +- chain/ethereum/src/chain.rs | 42 +++++--- chain/ethereum/src/ethereum_adapter.rs | 64 ++++++----- chain/near/src/chain.rs | 2 +- chain/starknet/src/chain.rs | 2 +- chain/substreams/src/trigger.rs | 2 +- graph/src/blockchain/block_stream.rs | 4 +- graph/src/blockchain/mock.rs | 2 +- graph/src/blockchain/mod.rs | 2 +- graph/src/blockchain/types.rs | 142 ++++++++++++++++++++++++- graph/src/components/store/traits.rs | 2 +- node/src/chain.rs | 3 +- store/postgres/src/chain_store.rs | 10 +- tests/src/fixture/mod.rs | 2 +- 16 files changed, 228 insertions(+), 64 deletions(-) diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index b0cb436e449..195abc09994 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -269,7 +269,7 @@ impl TriggersAdapterTrait for TriggersAdapter { })) } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 3883556ba5a..0f07c3aaaf9 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -197,15 +197,13 @@ impl TriggersAdapterTrait for TriggersAdapter { ) -> Result, Error> { panic!("Should never be called since not used by FirehoseBlockStream") } - - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, ) -> Result, Error> { - unimplemented!() + todo!() } - async fn chain_head_ptr(&self) -> Result, Error> { unimplemented!() } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 3d4dc00c030..c969f5521d9 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,5 +1,6 @@ use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; +use graph::blockchain::BlockPtrExt; use graph::blockchain::ChainIdentifier; use graph::components::subgraph::MappingError; use graph::data::store::ethereum::call; @@ -1109,12 +1110,12 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _chain_store: Arc, _block_numbers: HashSet, - ) -> Box, Error = Error> + Send>; + ) -> Box, Error = Error> + Send>; /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 5a2a23687eb..ac2422ff761 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -3,7 +3,7 @@ use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms}; use graph::blockchain::{ - BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, + BlockIngestor, BlockPtrExt, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, TriggersAdapterSelector, }; use graph::components::adapter::ChainId; @@ -156,6 +156,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { let requirements = filter.chain_filter.node_capabilities(); + let is_using_subgraph_composition = !source_subgraph_stores.is_empty(); let adapter = TriggersAdapterWrapper::new( chain .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) @@ -181,20 +182,26 @@ impl BlockStreamBuilder for EthereumStreamBuilder { // This is ok because Celo blocks are always final. And we _need_ to do this because // some events appear only in eth_getLogs but not in transaction receipts. // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50. - let chain_id = match chain.chain_client().as_ref() { + let reorg_threshold = match chain.chain_client().as_ref() { ChainClient::Rpc(adapter) => { - adapter + let chain_id = adapter .cheapest() .await .ok_or(anyhow!("unable to get eth adapter for chan_id call"))? .chain_id() - .await? + .await?; + + if CELO_CHAIN_IDS.contains(&chain_id) { + 0 + } else { + chain.reorg_threshold + } } - _ => panic!("expected rpc when using polling blockstream"), - }; - let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) { - false => chain.reorg_threshold, - true => 0, + _ if is_using_subgraph_composition => chain.reorg_threshold, + _ => panic!( + "expected rpc when using polling blockstream : {}", + is_using_subgraph_composition + ), }; Ok(Box::new(PollingBlockStream::new( @@ -617,6 +624,8 @@ pub enum BlockFinality { // If a block may still be reorged, we need to work with more local data. NonFinal(EthereumBlockWithCalls), + + Ptr(Arc), } impl Default for BlockFinality { @@ -630,6 +639,7 @@ impl BlockFinality { match self { BlockFinality::Final(block) => block, BlockFinality::NonFinal(block) => &block.ethereum_block.block, + BlockFinality::Ptr(_) => unreachable!("light_block called on HeaderOnly"), } } } @@ -639,6 +649,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr { match block { BlockFinality::Final(b) => BlockPtr::from(&**b), BlockFinality::NonFinal(b) => BlockPtr::from(&b.ethereum_block), + BlockFinality::Ptr(b) => BlockPtr::new(b.hash.clone(), b.number), } } } @@ -648,6 +659,7 @@ impl Block for BlockFinality { match self { BlockFinality::Final(block) => block.block_ptr(), BlockFinality::NonFinal(block) => block.ethereum_block.block.block_ptr(), + BlockFinality::Ptr(block) => BlockPtr::new(block.hash.clone(), block.number), } } @@ -655,6 +667,9 @@ impl Block for BlockFinality { match self { BlockFinality::Final(block) => block.parent_ptr(), BlockFinality::NonFinal(block) => block.ethereum_block.block.parent_ptr(), + BlockFinality::Ptr(block) => { + Some(BlockPtr::new(block.parent_hash.clone(), block.number - 1)) + } } } @@ -687,6 +702,7 @@ impl Block for BlockFinality { json::to_value(eth_block) } BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block), + BlockFinality::Ptr(_) => Ok(json::Value::Null), } } @@ -694,6 +710,7 @@ impl Block for BlockFinality { let ts = match self { BlockFinality::Final(block) => block.timestamp, BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp, + BlockFinality::Ptr(block) => block.timestamp, }; let ts = i64::try_from(ts.as_u64()).unwrap(); BlockTime::since_epoch(ts, 0) @@ -735,7 +752,7 @@ impl TriggersAdapterTrait for TriggersAdapter { .await } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, block_numbers: HashSet, @@ -749,9 +766,9 @@ impl TriggersAdapterTrait for TriggersAdapter { .await?; let blocks = adapter - .load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers) + .load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers) .await - .map(|block| BlockFinality::Final(block)) + .map(|block| BlockFinality::Ptr(block)) .collect() .compat() .await?; @@ -812,6 +829,7 @@ impl TriggersAdapterTrait for TriggersAdapter { triggers.append(&mut parse_block_triggers(&filter.block, full_block)); Ok(BlockWithTriggers::new(block, triggers, logger)) } + BlockFinality::Ptr(_) => unreachable!("triggers_in_block called on HeaderOnly"), } } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 8fd836d1742..2da9c95863f 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,6 +1,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockHash; +use graph::blockchain::BlockPtrExt; use graph::blockchain::ChainIdentifier; use graph::components::transaction_receipt::LightTransactionReceipt; @@ -783,11 +784,11 @@ impl EthereumAdapter { } /// Request blocks by number through JSON-RPC. - fn load_blocks_by_numbers_rpc( + fn load_block_ptrs_by_numbers_rpc( &self, logger: Logger, numbers: Vec, - ) -> impl Stream, Error = Error> + Send { + ) -> impl Stream, Error = Error> + Send { let web3 = self.web3.clone(); stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { @@ -798,19 +799,29 @@ impl EthereumAdapter { .run(move || { Box::pin( web3.eth() - .block_with_txs(BlockId::Number(Web3BlockNumber::Number( - number.into(), - ))), + .block(BlockId::Number(Web3BlockNumber::Number(number.into()))), ) .compat() .from_err::() .and_then(move |block| { - block.map(Arc::new).ok_or_else(|| { - anyhow::anyhow!( - "Ethereum node did not find block with number {:?}", - number - ) - }) + block + .map(|block| { + let ptr = BlockPtrExt::try_from(( + block.hash, + block.number, + block.parent_hash, + block.timestamp, + )) + .unwrap(); + + Arc::new(ptr) + }) + .ok_or_else(|| { + anyhow::anyhow!( + "Ethereum node did not find block with number {:?}", + number + ) + }) }) .compat() }) @@ -1690,15 +1701,15 @@ impl EthereumAdapterTrait for EthereumAdapter { } /// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream. - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, chain_store: Arc, block_numbers: HashSet, - ) -> Box, Error = Error> + Send> { + ) -> Box, Error = Error> + Send> { let blocks_map: BTreeMap> = chain_store .cheap_clone() - .blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) + .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) .await .map_err(|e| { error!(&logger, "Error accessing block cache {}", e); @@ -1706,7 +1717,7 @@ impl EthereumAdapterTrait for EthereumAdapter { }) .unwrap_or_default(); - let mut blocks: Vec> = blocks_map + let mut blocks: Vec> = blocks_map .into_iter() .filter_map(|(_number, values)| { if values.len() == 1 { @@ -1719,7 +1730,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let missing_blocks: Vec = block_numbers .into_iter() - .filter(|&number| !blocks.iter().any(|block| block.number() == number)) + .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) .collect(); if !missing_blocks.is_empty() { @@ -1731,20 +1742,9 @@ impl EthereumAdapterTrait for EthereumAdapter { } Box::new( - self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks) + self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks) .collect() .map(move |new_blocks| { - let upsert_blocks: Vec<_> = new_blocks - .iter() - .map(|block| BlockFinality::Final(block.clone())) - .collect(); - let block_refs: Vec<_> = upsert_blocks - .iter() - .map(|block| block as &dyn graph::blockchain::Block) - .collect(); - if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) { - error!(logger, "Error writing to block cache {}", e); - } blocks.extend(new_blocks); blocks.sort_by_key(|block| block.number); stream::iter_ok(blocks) @@ -2028,6 +2028,9 @@ pub(crate) async fn get_calls( calls: Some(calls), })) } + BlockFinality::Ptr(_) => { + unreachable!("get_calls called with BlockFinality::Ptr") + } } } @@ -2209,6 +2212,11 @@ async fn filter_call_triggers_from_unsuccessful_transactions( "this function should not be called when dealing with non-final blocks" ) } + BlockFinality::Ptr(_block) => { + unreachable!( + "this function should not be called when dealing with header-only blocks" + ) + } } }; diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 934576d742b..3b9643cf7af 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -325,7 +325,7 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index e82df2fde48..d32f831d3a6 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -372,7 +372,7 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index db4034cd55c..3e6dafcb2f0 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -136,7 +136,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { unimplemented!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index e32d2bb624b..3fd035bc387 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -419,7 +419,7 @@ async fn scan_subgraph_triggers( block_numbers.insert(to); let blocks = adapter - .load_blocks_by_numbers(logger.clone(), block_numbers) + .load_block_ptrs_by_numbers(logger.clone(), block_numbers) .await?; create_subgraph_triggers::(logger.clone(), blocks, filter, entities).await @@ -591,7 +591,7 @@ pub trait TriggersAdapter: Send + Sync { /// Get pointer to parent of `block`. This is called when reverting `block`. async fn chain_head_ptr(&self) -> Result, Error>; - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, block_numbers: HashSet, diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 18f1de92546..2f1480dd46a 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -229,7 +229,7 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 0b32b7664aa..52b3ec56904 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -54,7 +54,7 @@ pub use block_stream::{ChainHeadUpdateListener, ChainHeadUpdateStream, TriggersA pub use builder::{BasicBlockchainBuilder, BlockchainBuilder}; pub use empty_node_capabilities::EmptyNodeCapabilities; pub use noop_runtime_adapter::NoopRuntimeAdapter; -pub use types::{BlockHash, BlockPtr, BlockTime, ChainIdentifier}; +pub use types::{BlockHash, BlockPtr, BlockPtrExt, BlockTime, ChainIdentifier}; use self::{ block_stream::{BlockStream, FirehoseCursor}, diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 7c670d4cdd6..76806dfec3f 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -5,10 +5,11 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Timestamptz; use diesel::sql_types::{Bytea, Nullable, Text}; use diesel_derives::{AsExpression, FromSqlRow}; +use serde::Deserialize; use std::convert::TryFrom; use std::time::Duration; use std::{fmt, str::FromStr}; -use web3::types::{Block, H256}; +use web3::types::{Block, H256, U256, U64}; use crate::cheap_clone::CheapClone; use crate::components::store::BlockNumber; @@ -20,7 +21,7 @@ use crate::prelude::{r, BigInt, TryFromValue, Value, ValueMap}; use crate::util::stable_hash_glue::{impl_stable_hash, AsBytes}; /// A simple marker for byte arrays that are really block hashes -#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression)] +#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression, Deserialize)] #[diesel(sql_type = Bytea)] pub struct BlockHash(pub Box<[u8]>); @@ -330,6 +331,143 @@ impl From for BlockNumber { } } +#[derive(Clone, PartialEq, Eq, Hash, Deserialize)] +pub struct BlockPtrExt { + pub hash: BlockHash, + pub number: BlockNumber, + pub parent_hash: BlockHash, + pub timestamp: U256, +} + +impl BlockPtrExt { + pub fn new( + hash: BlockHash, + number: BlockNumber, + parent_hash: BlockHash, + timestamp: U256, + ) -> Self { + Self { + hash, + number, + parent_hash, + timestamp, + } + } + + /// Encodes the block hash into a hexadecimal string **without** a "0x" prefix. + /// Hashes are stored in the database in this format. + pub fn hash_hex(&self) -> String { + self.hash.hash_hex() + } + + /// Encodes the parent block hash into a hexadecimal string **without** a "0x" prefix. + pub fn parent_hash_hex(&self) -> String { + self.parent_hash.hash_hex() + } + + /// Block number to be passed into the store. Panics if it does not fit in an i32. + pub fn block_number(&self) -> BlockNumber { + self.number + } + + pub fn hash_as_h256(&self) -> H256 { + H256::from_slice(&self.hash_slice()[..32]) + } + + pub fn parent_hash_as_h256(&self) -> H256 { + H256::from_slice(&self.parent_hash_slice()[..32]) + } + + pub fn hash_slice(&self) -> &[u8] { + self.hash.0.as_ref() + } + + pub fn parent_hash_slice(&self) -> &[u8] { + self.parent_hash.0.as_ref() + } +} + +impl fmt::Display for BlockPtrExt { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "#{} ({}) [parent: {}]", + self.number, + self.hash_hex(), + self.parent_hash_hex() + ) + } +} + +impl fmt::Debug for BlockPtrExt { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "#{} ({}) [parent: {}]", + self.number, + self.hash_hex(), + self.parent_hash_hex() + ) + } +} + +impl slog::Value for BlockPtrExt { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::Value::serialize(&self.to_string(), record, key, serializer) + } +} + +impl IntoValue for BlockPtrExt { + fn into_value(self) -> r::Value { + object! { + __typename: "Block", + hash: self.hash_hex(), + number: format!("{}", self.number), + parent_hash: self.parent_hash_hex(), + timestamp: format!("{}", self.timestamp), + } + } +} + +impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { + type Error = anyhow::Error; + + fn try_from(tuple: (Option, Option, H256, U256)) -> Result { + let (hash_opt, number_opt, parent_hash, timestamp) = tuple; + + let hash = hash_opt.ok_or_else(|| anyhow!("Block hash is missing"))?; + let number = number_opt + .ok_or_else(|| anyhow!("Block number is missing"))? + .as_u64(); + + let block_number = + i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?; + + Ok(BlockPtrExt { + hash: hash.into(), + number: block_number, + parent_hash: parent_hash.into(), + timestamp, + }) + } +} +impl From for H256 { + fn from(ptr: BlockPtrExt) -> Self { + ptr.hash_as_h256() + } +} + +impl From for BlockNumber { + fn from(ptr: BlockPtrExt) -> Self { + ptr.number + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] /// A collection of attributes that (kind of) uniquely identify a blockchain. pub struct ChainIdentifier { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 906169c5438..2d69f456380 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -523,7 +523,7 @@ pub trait ChainStore: Send + Sync + 'static { ) -> Result, Error>; /// Returns the blocks present in the store for the given block numbers. - async fn blocks_by_numbers( + async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, ) -> Result>, Error>; diff --git a/node/src/chain.rs b/node/src/chain.rs index 3e87ff8295b..dab3890f24e 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -529,6 +529,7 @@ pub async fn networks_as_chains( }; let client = Arc::new(cc); + let eth_adapters = Arc::new(eth_adapters); let adapter_selector = EthereumAdapterSelector::new( logger_factory.clone(), client.clone(), @@ -551,7 +552,7 @@ pub async fn networks_as_chains( Arc::new(EthereumBlockRefetcher {}), Arc::new(adapter_selector), Arc::new(EthereumRuntimeAdapterBuilder {}), - Arc::new(eth_adapters.clone()), + eth_adapters, ENV_VARS.reorg_threshold, polling_interval, true, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 443305dc197..0a67cd45ff9 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -580,7 +580,7 @@ mod data { Ok(()) } - pub(super) fn blocks_by_numbers( + pub(super) fn block_ptrs_by_numbers( &self, conn: &mut PgConnection, chain: &str, @@ -1930,7 +1930,7 @@ impl ChainStore { .with_conn(move |conn, _| { store .storage - .blocks_by_numbers(conn, &store.chain, &numbers) + .block_ptrs_by_numbers(conn, &store.chain, &numbers) .map_err(CancelableError::from) }) .await?; @@ -2142,7 +2142,7 @@ impl ChainStoreTrait for ChainStore { Ok(()) } - async fn blocks_by_numbers( + async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, ) -> Result>, Error> { @@ -2163,7 +2163,7 @@ impl ChainStoreTrait for ChainStore { .collect(); Ok(values) } else { - let cached = self.recent_blocks_cache.get_blocks_by_numbers(&numbers); + let cached = self.recent_blocks_cache.get_block_ptrs_by_numbers(&numbers); let stored = if cached.len() < numbers.len() { let missing_numbers = numbers @@ -2655,7 +2655,7 @@ mod recent_blocks_cache { blocks } - pub fn get_blocks_by_numbers( + pub fn get_block_ptrs_by_numbers( &self, numbers: &[BlockNumber], ) -> Vec<(BlockPtr, json::Value)> { diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 065ee6484cf..e17bb3d56ea 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -978,7 +978,7 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, From eb1cb5de7b4b7f4d09bd644b689ddde1f6317657 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 5 Sep 2024 11:58:05 +0530 Subject: [PATCH 2/6] graph: implement deserialize for BlockPtrExt and add tests --- graph/src/blockchain/types.rs | 92 ++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 76806dfec3f..ed9e2d6071a 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -5,7 +5,7 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Timestamptz; use diesel::sql_types::{Bytea, Nullable, Text}; use diesel_derives::{AsExpression, FromSqlRow}; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use std::convert::TryFrom; use std::time::Duration; use std::{fmt, str::FromStr}; @@ -21,7 +21,7 @@ use crate::prelude::{r, BigInt, TryFromValue, Value, ValueMap}; use crate::util::stable_hash_glue::{impl_stable_hash, AsBytes}; /// A simple marker for byte arrays that are really block hashes -#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression, Deserialize)] +#[derive(Clone, Default, PartialEq, Eq, Hash, FromSqlRow, AsExpression)] #[diesel(sql_type = Bytea)] pub struct BlockHash(pub Box<[u8]>); @@ -49,6 +49,16 @@ impl BlockHash { } } +impl<'de> Deserialize<'de> for BlockHash { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s: String = Deserialize::deserialize(deserializer)?; + BlockHash::from_str(&s).map_err(serde::de::Error::custom) + } +} + impl CheapClone for BlockHash { fn cheap_clone(&self) -> Self { Self(self.0.clone()) @@ -331,9 +341,25 @@ impl From for BlockNumber { } } +fn deserialize_block_number<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + + if s.starts_with("0x") { + let s = s.trim_start_matches("0x"); + i32::from_str_radix(s, 16).map_err(serde::de::Error::custom) + } else { + i32::from_str(&s).map_err(serde::de::Error::custom) + } +} + #[derive(Clone, PartialEq, Eq, Hash, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct BlockPtrExt { pub hash: BlockHash, + #[serde(deserialize_with = "deserialize_block_number")] pub number: BlockNumber, pub parent_hash: BlockHash, pub timestamp: U256, @@ -577,3 +603,65 @@ impl ToSql for BlockTime { >::to_sql(&self.0, out) } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn test_blockhash_deserialization() { + let json_data = "\"0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac\""; + + let block_hash: BlockHash = + serde_json::from_str(json_data).expect("Deserialization failed"); + + let expected_bytes = + hex::decode("8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac") + .expect("Hex decoding failed"); + + assert_eq!( + *block_hash.0, expected_bytes, + "BlockHash does not match expected bytes" + ); + } + + #[test] + fn test_block_ptr_ext_deserialization() { + // JSON data with a hex string for BlockNumber + let json_data = r#" + { + "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", + "number": "0x2A", + "parentHash": "0xabc123", + "timestamp": "123456789012345678901234567890" + } + "#; + + // Deserialize the JSON string into a BlockPtrExt + let block_ptr_ext: BlockPtrExt = + serde_json::from_str(json_data).expect("Deserialization failed"); + + // Verify the deserialized values + assert_eq!(block_ptr_ext.number, 42); // 0x2A in hex is 42 in decimal + } + + #[test] + fn test_invalid_block_number_deserialization() { + let invalid_json_data = r#" + { + "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", + "number": "invalid_hex_string", + "parentHash": "0xabc123", + "timestamp": "123456789012345678901234567890" + } + "#; + + let result: Result = serde_json::from_str(invalid_json_data); + + assert!( + result.is_err(), + "Deserialization should have failed for invalid block number" + ); + } +} From 80a0d4b3a179ad0b637380bbe4cd49cc563c063d Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 5 Sep 2024 13:28:00 +0530 Subject: [PATCH 3/6] chain/ethereum: Add block_ptr_batch_size env variable --- chain/ethereum/src/env.rs | 6 ++++++ chain/ethereum/src/ethereum_adapter.rs | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/chain/ethereum/src/env.rs b/chain/ethereum/src/env.rs index 75c313212b9..bc7223dbc07 100644 --- a/chain/ethereum/src/env.rs +++ b/chain/ethereum/src/env.rs @@ -33,6 +33,9 @@ pub struct EnvVars { /// Set by the environment variable `ETHEREUM_BLOCK_BATCH_SIZE`. The /// default value is 10 blocks. pub block_batch_size: usize, + /// Set by the environment variable `ETHEREUM_BLOCK_PTR_BATCH_SIZE`. The + /// default value is 10 blocks. + pub block_ptr_batch_size: usize, /// Maximum number of blocks to request in each chunk. /// /// Set by the environment variable `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`. @@ -116,6 +119,7 @@ impl From for EnvVars { trace_stream_step_size: x.trace_stream_step_size, max_event_only_range: x.max_event_only_range, block_batch_size: x.block_batch_size, + block_ptr_batch_size: x.block_ptr_batch_size, max_block_range_size: x.max_block_range_size, json_rpc_timeout: Duration::from_secs(x.json_rpc_timeout_in_secs), block_receipts_check_timeout: Duration::from_secs( @@ -160,6 +164,8 @@ struct Inner { max_event_only_range: BlockNumber, #[envconfig(from = "ETHEREUM_BLOCK_BATCH_SIZE", default = "10")] block_batch_size: usize, + #[envconfig(from = "ETHEREUM_BLOCK_PTR_BATCH_SIZE", default = "100")] + block_ptr_batch_size: usize, #[envconfig(from = "GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE", default = "2000")] max_block_range_size: BlockNumber, #[envconfig(from = "GRAPH_ETHEREUM_JSON_RPC_TIMEOUT", default = "180")] diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 2da9c95863f..117783cf815 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -829,7 +829,7 @@ impl EthereumAdapter { .compat() .from_err() })) - .buffered(ENV_VARS.block_batch_size) + .buffered(ENV_VARS.block_ptr_batch_size) } /// Request blocks ptrs for numbers through JSON-RPC. From 9732a46d107541f97d053389d1e0a46a2bf3c7dc Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 5 Sep 2024 13:36:53 +0530 Subject: [PATCH 4/6] chain/ethereum: bump block range size for composed subgraphs --- chain/ethereum/src/chain.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index ac2422ff761..ae7a7df1724 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -204,6 +204,12 @@ impl BlockStreamBuilder for EthereumStreamBuilder { ), }; + let max_block_range_size = if is_using_subgraph_composition { + ENV_VARS.max_block_range_size * 10 + } else { + ENV_VARS.max_block_range_size + }; + Ok(Box::new(PollingBlockStream::new( chain_store, chain_head_update_stream, @@ -214,7 +220,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { start_blocks, reorg_threshold, logger, - ENV_VARS.max_block_range_size, + max_block_range_size, ENV_VARS.target_triggers_per_block_range, unified_api_version, subgraph_current_block, From d27793e4b50547d2d30b199e5967841df0f9f4bb Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 7 Oct 2024 09:25:48 +0100 Subject: [PATCH 5/6] chain, graph: Fix block ptr ext parsing bug --- chain/ethereum/src/ethereum_adapter.rs | 4 +- graph/src/blockchain/types.rs | 15 +++++ graph/src/components/store/traits.rs | 4 +- store/postgres/src/chain_store.rs | 89 ++++++++++++++++---------- 4 files changed, 74 insertions(+), 38 deletions(-) diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 117783cf815..905d057f93d 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1707,7 +1707,7 @@ impl EthereumAdapterTrait for EthereumAdapter { chain_store: Arc, block_numbers: HashSet, ) -> Box, Error = Error> + Send> { - let blocks_map: BTreeMap> = chain_store + let blocks_map = chain_store .cheap_clone() .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) .await @@ -1721,7 +1721,7 @@ impl EthereumAdapterTrait for EthereumAdapter { .into_iter() .filter_map(|(_number, values)| { if values.len() == 1 { - json::from_value(values[0].clone()).ok() + Arc::new(values[0].clone()).into() } else { None } diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index ed9e2d6071a..2fa04a6e41c 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -482,6 +482,21 @@ impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { }) } } + +impl TryFrom<(H256, i32, H256, U256)> for BlockPtrExt { + type Error = anyhow::Error; + + fn try_from(tuple: (H256, i32, H256, U256)) -> Result { + let (hash, block_number, parent_hash, timestamp) = tuple; + + Ok(BlockPtrExt { + hash: hash.into(), + number: block_number, + parent_hash: parent_hash.into(), + timestamp, + }) + } +} impl From for H256 { fn from(ptr: BlockPtrExt) -> Self { ptr.hash_as_h256() diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 2d69f456380..f156af1aabf 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -7,7 +7,7 @@ use web3::types::{Address, H256}; use super::*; use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor}; -use crate::blockchain::{BlockTime, ChainIdentifier}; +use crate::blockchain::{BlockPtrExt, BlockTime, ChainIdentifier}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::server::index_node::VersionInfo; use crate::components::subgraph::SubgraphVersionSwitchingMode; @@ -526,7 +526,7 @@ pub trait ChainStore: Send + Sync + 'static { async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, - ) -> Result>, Error>; + ) -> Result>, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 0a67cd45ff9..daf0e70a4d1 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; @@ -21,9 +22,9 @@ use std::{ sync::Arc, }; -use graph::blockchain::{Block, BlockHash, ChainIdentifier}; +use graph::blockchain::{Block, BlockHash, BlockPtrExt, ChainIdentifier}; use graph::cheap_clone::CheapClone; -use graph::prelude::web3::types::H256; +use graph::prelude::web3::types::{H256, U256}; use graph::prelude::{ async_trait, serde_json as json, transaction_receipt::LightTransactionReceipt, BlockNumber, BlockPtr, CachedEthereumCall, CancelableError, ChainStore as ChainStoreTrait, Error, @@ -53,6 +54,14 @@ impl JsonBlock { data, } } + + fn timestamp(&self) -> Option { + self.data + .as_ref() + .and_then(|data| data.get("timestamp")) + .and_then(|ts| ts.as_str()) + .and_then(|ts| U256::from_dec_str(ts).ok()) + } } /// Tables in the 'public' database schema that store chain-specific data @@ -1949,6 +1958,20 @@ impl ChainStore { } } +fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result { + let hash = json_block.ptr.hash.clone(); + let number = json_block.ptr.number; + let parent_hash = json_block.parent_hash.clone(); + + let timestamp = json_block + .timestamp() + .ok_or_else(|| anyhow!("Timestamp is missing"))?; + + let ptr = BlockPtrExt::try_from((hash.as_h256(), number, parent_hash.as_h256(), timestamp)) + .map_err(|e| anyhow!("Failed to convert to BlockPtrExt: {}", e))?; + + Ok(ptr) +} #[async_trait] impl ChainStoreTrait for ChainStore { fn genesis_block_ptr(&self) -> Result { @@ -2145,23 +2168,11 @@ impl ChainStoreTrait for ChainStore { async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, - ) -> Result>, Error> { - if ENV_VARS.store.disable_block_cache_for_lookup { - let values = self - .blocks_from_store_by_numbers(numbers) - .await? - .into_iter() - .map(|(num, blocks)| { - ( - num, - blocks - .into_iter() - .filter_map(|block| block.data) - .collect::>(), - ) - }) - .collect(); - Ok(values) + ) -> Result>, Error> { + let result = if ENV_VARS.store.disable_block_cache_for_lookup { + let values = self.blocks_from_store_by_numbers(numbers).await?; + + values } else { let cached = self.recent_blocks_cache.get_block_ptrs_by_numbers(&numbers); @@ -2209,16 +2220,28 @@ impl ChainStoreTrait for ChainStore { .map(|(ptr, data)| (ptr.block_number(), vec![data])) .collect::>(); - let mut result: BTreeMap> = cached_map; + let mut result = cached_map; for (num, blocks) in stored { - result - .entry(num) - .or_default() - .extend(blocks.into_iter().filter_map(|block| block.data)); + if !result.contains_key(&num) { + result.insert(num, blocks); + } } - Ok(result) - } + result + }; + + let ptrs = result + .into_iter() + .map(|(num, blocks)| { + let ptrs = blocks + .into_iter() + .filter_map(|block| json_block_to_block_ptr_ext(&block).ok()) + .collect(); + (num, ptrs) + }) + .collect(); + + Ok(ptrs) } async fn blocks(self: Arc, hashes: Vec) -> Result, Error> { @@ -2527,10 +2550,8 @@ mod recent_blocks_cache { .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) } - fn get_block_by_number(&self, number: BlockNumber) -> Option<(&BlockPtr, &json::Value)> { - self.blocks - .get(&number) - .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) + fn get_block_by_number(&self, number: BlockNumber) -> Option<&JsonBlock> { + self.blocks.get(&number) } fn get_ancestor( @@ -2658,13 +2679,13 @@ mod recent_blocks_cache { pub fn get_block_ptrs_by_numbers( &self, numbers: &[BlockNumber], - ) -> Vec<(BlockPtr, json::Value)> { + ) -> Vec<(BlockPtr, JsonBlock)> { let inner = self.inner.read(); - let mut blocks: Vec<(BlockPtr, json::Value)> = Vec::new(); + let mut blocks: Vec<(BlockPtr, JsonBlock)> = Vec::new(); for &number in numbers { - if let Some((ptr, block)) = inner.get_block_by_number(number) { - blocks.push((ptr.clone(), block.clone())); + if let Some(block) = inner.get_block_by_number(number) { + blocks.push((block.ptr.clone(), block.clone())); } } From f1bb58c99f7a382cd3caf26c1f0567434afc2d09 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 19 Nov 2024 15:20:36 +0400 Subject: [PATCH 6/6] chain, graph: rename BlockPtrExt to ExtendedBlockPtr --- chain/ethereum/src/adapter.rs | 4 +-- chain/ethereum/src/chain.rs | 6 ++--- chain/ethereum/src/ethereum_adapter.rs | 10 ++++---- graph/src/blockchain/mod.rs | 2 +- graph/src/blockchain/types.rs | 34 +++++++++++++------------- graph/src/components/store/traits.rs | 4 +-- store/postgres/src/chain_store.rs | 11 +++++---- 7 files changed, 36 insertions(+), 35 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index c969f5521d9..1ccdb3d8f2d 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,7 +1,7 @@ use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; -use graph::blockchain::BlockPtrExt; use graph::blockchain::ChainIdentifier; +use graph::blockchain::ExtendedBlockPtr; use graph::components::subgraph::MappingError; use graph::data::store::ethereum::call; use graph::firehose::CallToFilter; @@ -1115,7 +1115,7 @@ pub trait EthereumAdapter: Send + Sync + 'static { _logger: Logger, _chain_store: Arc, _block_numbers: HashSet, - ) -> Box, Error = Error> + Send>; + ) -> Box, Error = Error> + Send>; /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index ae7a7df1724..a92d74ca788 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -3,8 +3,8 @@ use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms}; use graph::blockchain::{ - BlockIngestor, BlockPtrExt, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, - TriggersAdapterSelector, + BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, ExtendedBlockPtr, + TriggerFilterWrapper, TriggersAdapterSelector, }; use graph::components::adapter::ChainId; use graph::components::store::{DeploymentCursorTracker, SourceableStore}; @@ -631,7 +631,7 @@ pub enum BlockFinality { // If a block may still be reorged, we need to work with more local data. NonFinal(EthereumBlockWithCalls), - Ptr(Arc), + Ptr(Arc), } impl Default for BlockFinality { diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 905d057f93d..dc672f1b5d9 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,8 +1,8 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockHash; -use graph::blockchain::BlockPtrExt; use graph::blockchain::ChainIdentifier; +use graph::blockchain::ExtendedBlockPtr; use graph::components::transaction_receipt::LightTransactionReceipt; use graph::data::store::ethereum::call; @@ -788,7 +788,7 @@ impl EthereumAdapter { &self, logger: Logger, numbers: Vec, - ) -> impl Stream, Error = Error> + Send { + ) -> impl Stream, Error = Error> + Send { let web3 = self.web3.clone(); stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { @@ -806,7 +806,7 @@ impl EthereumAdapter { .and_then(move |block| { block .map(|block| { - let ptr = BlockPtrExt::try_from(( + let ptr = ExtendedBlockPtr::try_from(( block.hash, block.number, block.parent_hash, @@ -1706,7 +1706,7 @@ impl EthereumAdapterTrait for EthereumAdapter { logger: Logger, chain_store: Arc, block_numbers: HashSet, - ) -> Box, Error = Error> + Send> { + ) -> Box, Error = Error> + Send> { let blocks_map = chain_store .cheap_clone() .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) @@ -1717,7 +1717,7 @@ impl EthereumAdapterTrait for EthereumAdapter { }) .unwrap_or_default(); - let mut blocks: Vec> = blocks_map + let mut blocks: Vec> = blocks_map .into_iter() .filter_map(|(_number, values)| { if values.len() == 1 { diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 52b3ec56904..76112ea3528 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -54,7 +54,7 @@ pub use block_stream::{ChainHeadUpdateListener, ChainHeadUpdateStream, TriggersA pub use builder::{BasicBlockchainBuilder, BlockchainBuilder}; pub use empty_node_capabilities::EmptyNodeCapabilities; pub use noop_runtime_adapter::NoopRuntimeAdapter; -pub use types::{BlockHash, BlockPtr, BlockPtrExt, BlockTime, ChainIdentifier}; +pub use types::{BlockHash, BlockPtr, BlockTime, ChainIdentifier, ExtendedBlockPtr}; use self::{ block_stream::{BlockStream, FirehoseCursor}, diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 2fa04a6e41c..4429dd87304 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -357,7 +357,7 @@ where #[derive(Clone, PartialEq, Eq, Hash, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct BlockPtrExt { +pub struct ExtendedBlockPtr { pub hash: BlockHash, #[serde(deserialize_with = "deserialize_block_number")] pub number: BlockNumber, @@ -365,7 +365,7 @@ pub struct BlockPtrExt { pub timestamp: U256, } -impl BlockPtrExt { +impl ExtendedBlockPtr { pub fn new( hash: BlockHash, number: BlockNumber, @@ -413,7 +413,7 @@ impl BlockPtrExt { } } -impl fmt::Display for BlockPtrExt { +impl fmt::Display for ExtendedBlockPtr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -425,7 +425,7 @@ impl fmt::Display for BlockPtrExt { } } -impl fmt::Debug for BlockPtrExt { +impl fmt::Debug for ExtendedBlockPtr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, @@ -437,7 +437,7 @@ impl fmt::Debug for BlockPtrExt { } } -impl slog::Value for BlockPtrExt { +impl slog::Value for ExtendedBlockPtr { fn serialize( &self, record: &slog::Record, @@ -448,7 +448,7 @@ impl slog::Value for BlockPtrExt { } } -impl IntoValue for BlockPtrExt { +impl IntoValue for ExtendedBlockPtr { fn into_value(self) -> r::Value { object! { __typename: "Block", @@ -460,7 +460,7 @@ impl IntoValue for BlockPtrExt { } } -impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { +impl TryFrom<(Option, Option, H256, U256)> for ExtendedBlockPtr { type Error = anyhow::Error; fn try_from(tuple: (Option, Option, H256, U256)) -> Result { @@ -474,7 +474,7 @@ impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { let block_number = i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?; - Ok(BlockPtrExt { + Ok(ExtendedBlockPtr { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), @@ -483,13 +483,13 @@ impl TryFrom<(Option, Option, H256, U256)> for BlockPtrExt { } } -impl TryFrom<(H256, i32, H256, U256)> for BlockPtrExt { +impl TryFrom<(H256, i32, H256, U256)> for ExtendedBlockPtr { type Error = anyhow::Error; fn try_from(tuple: (H256, i32, H256, U256)) -> Result { let (hash, block_number, parent_hash, timestamp) = tuple; - Ok(BlockPtrExt { + Ok(ExtendedBlockPtr { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), @@ -497,14 +497,14 @@ impl TryFrom<(H256, i32, H256, U256)> for BlockPtrExt { }) } } -impl From for H256 { - fn from(ptr: BlockPtrExt) -> Self { +impl From for H256 { + fn from(ptr: ExtendedBlockPtr) -> Self { ptr.hash_as_h256() } } -impl From for BlockNumber { - fn from(ptr: BlockPtrExt) -> Self { +impl From for BlockNumber { + fn from(ptr: ExtendedBlockPtr) -> Self { ptr.number } } @@ -653,8 +653,8 @@ mod tests { } "#; - // Deserialize the JSON string into a BlockPtrExt - let block_ptr_ext: BlockPtrExt = + // Deserialize the JSON string into a ExtendedBlockPtr + let block_ptr_ext: ExtendedBlockPtr = serde_json::from_str(json_data).expect("Deserialization failed"); // Verify the deserialized values @@ -672,7 +672,7 @@ mod tests { } "#; - let result: Result = serde_json::from_str(invalid_json_data); + let result: Result = serde_json::from_str(invalid_json_data); assert!( result.is_err(), diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index f156af1aabf..69ca216e007 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -7,7 +7,7 @@ use web3::types::{Address, H256}; use super::*; use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor}; -use crate::blockchain::{BlockPtrExt, BlockTime, ChainIdentifier}; +use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::server::index_node::VersionInfo; use crate::components::subgraph::SubgraphVersionSwitchingMode; @@ -526,7 +526,7 @@ pub trait ChainStore: Send + Sync + 'static { async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, - ) -> Result>, Error>; + ) -> Result>, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index daf0e70a4d1..097aa799eff 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -22,7 +22,7 @@ use std::{ sync::Arc, }; -use graph::blockchain::{Block, BlockHash, BlockPtrExt, ChainIdentifier}; +use graph::blockchain::{Block, BlockHash, ChainIdentifier, ExtendedBlockPtr}; use graph::cheap_clone::CheapClone; use graph::prelude::web3::types::{H256, U256}; use graph::prelude::{ @@ -1958,7 +1958,7 @@ impl ChainStore { } } -fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result { +fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result { let hash = json_block.ptr.hash.clone(); let number = json_block.ptr.number; let parent_hash = json_block.parent_hash.clone(); @@ -1967,8 +1967,9 @@ fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result, numbers: Vec, - ) -> Result>, Error> { + ) -> Result>, Error> { let result = if ENV_VARS.store.disable_block_cache_for_lookup { let values = self.blocks_from_store_by_numbers(numbers).await?;