Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph composition: Use block ptrs instead of full blocks in blockstream #5630

Open
wants to merge 6 commits into
base: krishna/subgraph-comp-use-block-cache
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}))
}

async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
Expand Down
6 changes: 2 additions & 4 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
unimplemented!()
todo!()
}

async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
unimplemented!()
}
Expand Down
5 changes: 3 additions & 2 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Error;
use ethabi::{Error as ABIError, Function, ParamType, Token};
use graph::blockchain::ChainIdentifier;
use graph::blockchain::ExtendedBlockPtr;
use graph::components::subgraph::MappingError;
use graph::data::store::ethereum::call;
use graph::firehose::CallToFilter;
Expand Down Expand Up @@ -1109,12 +1110,12 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_hash: H256,
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;

async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_chain_store: Arc<dyn ChainStore>,
_block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send>;

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
/// May use the `chain_store` as a cache.
Expand Down
50 changes: 37 additions & 13 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use anyhow::{Context, Error};
use graph::blockchain::client::ChainClient;
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper,
TriggersAdapterSelector,
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, ExtendedBlockPtr,
TriggerFilterWrapper, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
Expand Down Expand Up @@ -156,6 +156,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
let requirements = filter.chain_filter.node_capabilities();
let is_using_subgraph_composition = !source_subgraph_stores.is_empty();
let adapter = TriggersAdapterWrapper::new(
chain
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
Expand All @@ -181,20 +182,32 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
// This is ok because Celo blocks are always final. And we _need_ to do this because
// some events appear only in eth_getLogs but not in transaction receipts.
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
let chain_id = match chain.chain_client().as_ref() {
let reorg_threshold = match chain.chain_client().as_ref() {
ChainClient::Rpc(adapter) => {
adapter
let chain_id = adapter
.cheapest()
.await
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
.chain_id()
.await?
.await?;

if CELO_CHAIN_IDS.contains(&chain_id) {
0
} else {
chain.reorg_threshold
}
}
_ => panic!("expected rpc when using polling blockstream"),
_ if is_using_subgraph_composition => chain.reorg_threshold,
_ => panic!(
"expected rpc when using polling blockstream : {}",
is_using_subgraph_composition
),
};
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
false => chain.reorg_threshold,
true => 0,

let max_block_range_size = if is_using_subgraph_composition {
ENV_VARS.max_block_range_size * 10
} else {
ENV_VARS.max_block_range_size
};

Ok(Box::new(PollingBlockStream::new(
Expand All @@ -207,7 +220,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
start_blocks,
reorg_threshold,
logger,
ENV_VARS.max_block_range_size,
max_block_range_size,
ENV_VARS.target_triggers_per_block_range,
unified_api_version,
subgraph_current_block,
Expand Down Expand Up @@ -617,6 +630,8 @@ pub enum BlockFinality {

// If a block may still be reorged, we need to work with more local data.
NonFinal(EthereumBlockWithCalls),

Ptr(Arc<ExtendedBlockPtr>),
}

impl Default for BlockFinality {
Expand All @@ -630,6 +645,7 @@ impl BlockFinality {
match self {
BlockFinality::Final(block) => block,
BlockFinality::NonFinal(block) => &block.ethereum_block.block,
BlockFinality::Ptr(_) => unreachable!("light_block called on HeaderOnly"),
}
}
}
Expand All @@ -639,6 +655,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr {
match block {
BlockFinality::Final(b) => BlockPtr::from(&**b),
BlockFinality::NonFinal(b) => BlockPtr::from(&b.ethereum_block),
BlockFinality::Ptr(b) => BlockPtr::new(b.hash.clone(), b.number),
}
}
}
Expand All @@ -648,13 +665,17 @@ impl Block for BlockFinality {
match self {
BlockFinality::Final(block) => block.block_ptr(),
BlockFinality::NonFinal(block) => block.ethereum_block.block.block_ptr(),
BlockFinality::Ptr(block) => BlockPtr::new(block.hash.clone(), block.number),
}
}

fn parent_ptr(&self) -> Option<BlockPtr> {
match self {
BlockFinality::Final(block) => block.parent_ptr(),
BlockFinality::NonFinal(block) => block.ethereum_block.block.parent_ptr(),
BlockFinality::Ptr(block) => {
Some(BlockPtr::new(block.parent_hash.clone(), block.number - 1))
}
}
}

Expand Down Expand Up @@ -687,13 +708,15 @@ impl Block for BlockFinality {
json::to_value(eth_block)
}
BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block),
BlockFinality::Ptr(_) => Ok(json::Value::Null),
}
}

fn timestamp(&self) -> BlockTime {
let ts = match self {
BlockFinality::Final(block) => block.timestamp,
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
BlockFinality::Ptr(block) => block.timestamp,
};
let ts = i64::try_from(ts.as_u64()).unwrap();
BlockTime::since_epoch(ts, 0)
Expand Down Expand Up @@ -735,7 +758,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.await
}

async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
block_numbers: HashSet<BlockNumber>,
Expand All @@ -749,9 +772,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.await?;

let blocks = adapter
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
.await
.map(|block| BlockFinality::Final(block))
.map(|block| BlockFinality::Ptr(block))
.collect()
.compat()
.await?;
Expand Down Expand Up @@ -812,6 +835,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
triggers.append(&mut parse_block_triggers(&filter.block, full_block));
Ok(BlockWithTriggers::new(block, triggers, logger))
}
BlockFinality::Ptr(_) => unreachable!("triggers_in_block called on HeaderOnly"),
}
}

Expand Down
6 changes: 6 additions & 0 deletions chain/ethereum/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct EnvVars {
/// Set by the environment variable `ETHEREUM_BLOCK_BATCH_SIZE`. The
/// default value is 10 blocks.
pub block_batch_size: usize,
/// Set by the environment variable `ETHEREUM_BLOCK_PTR_BATCH_SIZE`. The
/// default value is 10 blocks.
pub block_ptr_batch_size: usize,
/// Maximum number of blocks to request in each chunk.
///
/// Set by the environment variable `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`.
Expand Down Expand Up @@ -116,6 +119,7 @@ impl From<Inner> for EnvVars {
trace_stream_step_size: x.trace_stream_step_size,
max_event_only_range: x.max_event_only_range,
block_batch_size: x.block_batch_size,
block_ptr_batch_size: x.block_ptr_batch_size,
max_block_range_size: x.max_block_range_size,
json_rpc_timeout: Duration::from_secs(x.json_rpc_timeout_in_secs),
block_receipts_check_timeout: Duration::from_secs(
Expand Down Expand Up @@ -160,6 +164,8 @@ struct Inner {
max_event_only_range: BlockNumber,
#[envconfig(from = "ETHEREUM_BLOCK_BATCH_SIZE", default = "10")]
block_batch_size: usize,
#[envconfig(from = "ETHEREUM_BLOCK_PTR_BATCH_SIZE", default = "100")]
block_ptr_batch_size: usize,
#[envconfig(from = "GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE", default = "2000")]
max_block_range_size: BlockNumber,
#[envconfig(from = "GRAPH_ETHEREUM_JSON_RPC_TIMEOUT", default = "180")]
Expand Down
70 changes: 39 additions & 31 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockHash;
use graph::blockchain::ChainIdentifier;
use graph::blockchain::ExtendedBlockPtr;

use graph::components::transaction_receipt::LightTransactionReceipt;
use graph::data::store::ethereum::call;
Expand Down Expand Up @@ -783,11 +784,11 @@ impl EthereumAdapter {
}

/// Request blocks by number through JSON-RPC.
fn load_blocks_by_numbers_rpc(
fn load_block_ptrs_by_numbers_rpc(
&self,
logger: Logger,
numbers: Vec<BlockNumber>,
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
) -> impl Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send {
let web3 = self.web3.clone();

stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
Expand All @@ -798,27 +799,37 @@ impl EthereumAdapter {
.run(move || {
Box::pin(
web3.eth()
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
number.into(),
))),
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
)
.compat()
.from_err::<Error>()
.and_then(move |block| {
block.map(Arc::new).ok_or_else(|| {
anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
block
.map(|block| {
let ptr = ExtendedBlockPtr::try_from((
block.hash,
block.number,
block.parent_hash,
block.timestamp,
))
.unwrap();

Arc::new(ptr)
})
.ok_or_else(|| {
anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
})
.compat()
})
.boxed()
.compat()
.from_err()
}))
.buffered(ENV_VARS.block_batch_size)
.buffered(ENV_VARS.block_ptr_batch_size)
}

/// Request blocks ptrs for numbers through JSON-RPC.
Expand Down Expand Up @@ -1690,27 +1701,27 @@ impl EthereumAdapterTrait for EthereumAdapter {
}

/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send> {
let blocks_map = chain_store
.cheap_clone()
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(&logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
let mut blocks: Vec<Arc<ExtendedBlockPtr>> = blocks_map
.into_iter()
.filter_map(|(_number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
Arc::new(values[0].clone()).into()
} else {
None
}
Expand All @@ -1719,7 +1730,7 @@ impl EthereumAdapterTrait for EthereumAdapter {

let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
.collect();

if !missing_blocks.is_empty() {
Expand All @@ -1731,20 +1742,9 @@ impl EthereumAdapterTrait for EthereumAdapter {
}

Box::new(
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
.collect()
.map(move |new_blocks| {
let upsert_blocks: Vec<_> = new_blocks
.iter()
.map(|block| BlockFinality::Final(block.clone()))
.collect();
let block_refs: Vec<_> = upsert_blocks
.iter()
.map(|block| block as &dyn graph::blockchain::Block)
.collect();
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
error!(logger, "Error writing to block cache {}", e);
}
blocks.extend(new_blocks);
blocks.sort_by_key(|block| block.number);
stream::iter_ok(blocks)
Expand Down Expand Up @@ -2028,6 +2028,9 @@ pub(crate) async fn get_calls(
calls: Some(calls),
}))
}
BlockFinality::Ptr(_) => {
unreachable!("get_calls called with BlockFinality::Ptr")
}
}
}

Expand Down Expand Up @@ -2209,6 +2212,11 @@ async fn filter_call_triggers_from_unsuccessful_transactions(
"this function should not be called when dealing with non-final blocks"
)
}
BlockFinality::Ptr(_block) => {
unreachable!(
"this function should not be called when dealing with header-only blocks"
)
}
}
};

Expand Down
2 changes: 1 addition & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn load_blocks_by_numbers(
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
Expand Down
Loading
Loading