diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 80250d7aa31..941112d8219 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -88,13 +88,13 @@ where /// be removed. The same thing also applies to the block cache. /// This function must be called before continuing to process in order to avoid /// duplicated host insertion and POI issues with dirty entity changes. - fn revert_state(&mut self, block_number: BlockNumber) -> Result<(), Error> { + fn revert_state_to(&mut self, block_number: BlockNumber) -> Result<(), Error> { self.state.entity_lfu_cache = LfuCache::new(); - // 1. Revert all hosts(created by DDS) up to block_number inclusively. + // 1. Revert all hosts(created by DDS) at a block higher than `block_number`. // 2. Unmark any offchain data sources that were marked done on the blocks being removed. // When no offchain datasources are present, 2. should be a noop. - self.ctx.revert_data_sources(block_number)?; + self.ctx.revert_data_sources(block_number + 1)?; Ok(()) } @@ -252,7 +252,7 @@ where if let Some(store) = store.restart().await? { let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0); - self.revert_state(last_good_block)?; + self.revert_state_to(last_good_block)?; self.inputs = Arc::new(self.inputs.with_store(store)); self.state.synced = self.inputs.store.is_deployment_synced().await?; } @@ -793,15 +793,7 @@ where } } - if matches!(action, Action::Restart) { - // Cancel the stream for real - self.ctx.instances.remove(&self.inputs.deployment.id); - - // And restart the subgraph - return Ok(Action::Restart); - } - - return Ok(Action::Continue); + return Ok(action); } Err(BlockProcessingError::Canceled) => { debug!(self.logger, "Subgraph block stream shut down cleanly"); @@ -811,7 +803,6 @@ where // Handle unexpected stream errors by marking the subgraph as failed. Err(e) => { self.metrics.stream.deployment_failed.set(1.0); - self.revert_state(block_ptr.block_number())?; let message = format!("{:#}", e).replace('\n', "\t"); let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); @@ -860,9 +851,6 @@ where // Retry logic below: - // Cancel the stream for real. - self.ctx.instances.remove(&self.inputs.deployment.id); - let message = format!("{:#}", e).replace('\n', "\t"); error!(self.logger, "Subgraph failed with non-deterministic error: {}", message; "attempt" => self.state.backoff.attempt, @@ -1361,7 +1349,7 @@ where .deployment_head .set(subgraph_ptr.number as f64); - self.revert_state(subgraph_ptr.number)?; + self.revert_state_to(revert_to_ptr.number)?; let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr); diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index ff6e7eef7a0..b62957d1e91 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -36,13 +36,13 @@ pub(crate) fn insert( } } -pub(crate) fn revert_to( +pub(crate) fn revert( conn: &PgConnection, site: &Site, block: BlockNumber, ) -> Result<(), StoreError> { match site.schema_version.private_data_sources() { - true => DataSourcesTable::new(site.namespace.clone()).revert_to(conn, block), + true => DataSourcesTable::new(site.namespace.clone()).revert(conn, block), false => shared::revert(conn, &site.deployment, block), } } diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index ae0d107d097..ad5d46bc5ac 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -190,15 +190,14 @@ impl DataSourcesTable { Ok(inserted_total) } - pub(crate) fn revert_to( - &self, - conn: &PgConnection, - block: BlockNumber, - ) -> Result<(), StoreError> { - // Use `@>` to leverage the gist index. - // This assumes all ranges are of the form [x, +inf). + pub(crate) fn revert(&self, conn: &PgConnection, block: BlockNumber) -> Result<(), StoreError> { + // Use the 'does not extend to the left of' operator `&>` to leverage the gist index, this + // is equivalent to lower(block_range) >= $1. + // + // This assumes all ranges are of the form [x, +inf), and thefore no range needs to be + // unclamped. let query = format!( - "delete from {} where block_range @> $1 and lower(block_range) >= $1", + "delete from {} where block_range &> int4range($1, null)", self.qname ); sql_query(query).bind::(block).execute(conn)?; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index b25e41beec0..78e878d2fe3 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -832,7 +832,7 @@ impl Layout { site: &Site, block: BlockNumber, ) -> Result<(), StoreError> { - crate::dynds::revert_to(conn, site, block)?; + crate::dynds::revert(conn, site, block)?; crate::deployment::revert_subgraph_errors(conn, &site.deployment, block)?; Ok(()) diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index e3694dd6000..b59d10c9e31 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -85,21 +85,23 @@ pub fn generate_empty_blocks_for_range( parent_ptr: BlockPtr, start: i32, end: i32, -) -> Vec> { - (start + 1..end + 1).fold( - vec![empty_block(parent_ptr.clone(), test_ptr(start))], - |mut blocks, i| { - let parent_ptr = blocks.last().unwrap().ptr().clone(); - blocks.push(empty_block(parent_ptr, test_ptr(i))); - blocks - }, - ) + add_to_hash: u64, // Use to differentiate forks +) -> Vec> { + let mut blocks: Vec> = vec![]; + + for i in start..(end + 1) { + let parent_ptr = blocks.last().map(|b| b.ptr()).unwrap_or(parent_ptr.clone()); + let ptr = BlockPtr { + number: i, + hash: H256::from_low_u64_be(i as u64 + add_to_hash).into(), + }; + blocks.push(empty_block(parent_ptr, ptr)); + } + + blocks } -pub fn empty_block( - parent_ptr: BlockPtr, - ptr: BlockPtr, -) -> BlockWithTriggers { +pub fn empty_block(parent_ptr: BlockPtr, ptr: BlockPtr) -> BlockWithTriggers { assert!(ptr != parent_ptr); assert!(ptr.number > parent_ptr.number); diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index f3c69045791..ee860a5abf5 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -1,9 +1,10 @@ pub mod ethereum; pub mod substreams; +use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Mutex; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Error; use async_stream::stream; @@ -578,11 +579,13 @@ pub async fn wait_for_sync( stop_block: BlockPtr, ) -> Result<(), SubgraphError> { // We wait one second between checks for the subgraph to sync. That - // means we wait up to a minute here by default + // means we wait up to a 30 seconds here by default. lazy_static! { - static ref MAX_ERR_COUNT: usize = std::env::var("RUNNER_TESTS_WAIT_FOR_SYNC_SECS") - .map(|val| val.parse().unwrap()) - .unwrap_or(60); + static ref MAX_WAIT: Duration = Duration::from_secs( + std::env::var("RUNNER_TESTS_WAIT_FOR_SYNC_SECS") + .map(|val| val.parse().unwrap()) + .unwrap_or(30) + ); } const WAIT_TIME: Duration = Duration::from_secs(1); @@ -601,11 +604,11 @@ pub async fn wait_for_sync( .unwrap(); } - let mut err_count = 0; + let start = Instant::now(); flush(logger, &store, deployment).await; - while err_count < *MAX_ERR_COUNT { + while start.elapsed() < *MAX_WAIT { tokio::time::sleep(WAIT_TIME).await; flush(logger, &store, deployment).await; @@ -613,7 +616,6 @@ pub async fn wait_for_sync( Ok(Some(ptr)) => ptr, res => { info!(&logger, "{:?}", res); - err_count += 1; continue; } }; @@ -621,9 +623,7 @@ pub async fn wait_for_sync( let status = store.status_for_id(deployment.id); if let Some(fatal_error) = status.fatal_error { - if fatal_error.block_ptr.as_ref().unwrap() == &stop_block { - return Err(fatal_error); - } + return Err(fatal_error); } if block_ptr == stop_block { @@ -634,8 +634,7 @@ pub async fn wait_for_sync( // We only get here if we timed out waiting for the subgraph to reach // the stop block - crit!(logger, "TEST: sync never completed (err_count={err_count})"); - panic!("Sync did not complete within {err_count}s"); + panic!("Sync did not complete within {}s", MAX_WAIT.as_secs()); } struct StaticBlockRefetcher { @@ -799,28 +798,47 @@ fn stream_events( where C::TriggerData: Clone, { + struct ForkDb { + blocks: HashMap, + } + + impl ForkDb { + fn common_ancestor(&self, a: BlockPtr, b: BlockPtr) -> Option<&B> { + let mut a = self.blocks.get(&a).unwrap(); + let mut b = self.blocks.get(&b).unwrap(); + while a.number() > b.number() { + dbg!(a.ptr().number); + a = self.blocks.get(&a.parent_ptr()?).unwrap(); + } + while b.number() > a.number() { + dbg!(b.ptr().number); + b = self.blocks.get(&b.parent_ptr()?).unwrap(); + } + while a.hash() != b.hash() { + a = self.blocks.get(&a.parent_ptr()?).unwrap(); + b = self.blocks.get(&b.parent_ptr()?).unwrap(); + } + Some(a) + } + } + + let fork_db = ForkDb { + blocks: blocks.iter().map(|b| (b.ptr(), b.block.clone())).collect(), + }; + // See also: static-stream-builder stream! { - let current_block = current_idx.map(|idx| &blocks[idx]); - let mut current_ptr = current_block.map(|b| b.ptr()); - let mut current_parent_ptr = current_block.and_then(|b| b.parent_ptr()); + let mut current_ptr = current_idx.map(|idx| blocks[idx].ptr()); let skip = current_idx.map(|idx| idx + 1).unwrap_or(0); let mut blocks_iter = blocks.iter().skip(skip).peekable(); while let Some(&block) = blocks_iter.peek() { if block.parent_ptr() == current_ptr { current_ptr = Some(block.ptr()); - current_parent_ptr = block.parent_ptr(); blocks_iter.next(); // Block consumed, advance the iterator. yield Ok(BlockStreamEvent::ProcessBlock(block.clone(), FirehoseCursor::None)); } else { - let revert_to = current_parent_ptr.unwrap(); + let revert_to = fork_db.common_ancestor(block.ptr(), current_ptr.unwrap()).unwrap().ptr(); current_ptr = Some(revert_to.clone()); - current_parent_ptr = blocks - .iter() - .find(|b| b.ptr() == revert_to) - .unwrap() - .block - .parent_ptr(); yield Ok(BlockStreamEvent::Revert(revert_to, FirehoseCursor::None)); } } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 9dcb66f77de..1c076991e5c 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -180,6 +180,67 @@ async fn data_source_revert() -> anyhow::Result<()> { Some(object! { dataSourceCount: object!{ id: "4", count: 4 } }) ); + // This is an entirely different test, but running it here conveniently avoids race conditions + // since it uses the same deployment id. + data_source_long_revert().await.unwrap(); + + Ok(()) +} + +async fn data_source_long_revert() -> anyhow::Result<()> { + let RunnerTestRecipe { + stores, + test_name, + subgraph_name, + hash, + } = RunnerTestRecipe::new("data_source_long_revert", "data-source-revert").await; + + let blocks = { + let block0 = genesis(); + let blocks_1_to_5 = generate_empty_blocks_for_range(block0.ptr(), 1, 5, 0); + let blocks_1_to_5_reorged = generate_empty_blocks_for_range(block0.ptr(), 1, 5, 1); + + let mut blocks = vec![block0]; + blocks.extend(blocks_1_to_5); + blocks.extend(blocks_1_to_5_reorged); + blocks + }; + let last = blocks.last().unwrap().block.ptr(); + + let chain = chain(&test_name, blocks.clone(), &stores, None).await; + let ctx = fixture::setup( + &test_name, + subgraph_name.clone(), + &hash, + &stores, + &chain, + None, + None, + ) + .await; + + // We sync up to block 5 twice, after the first time there is a revert back to block 1. + // This tests reverts across more than than a single block. + for stop_block in [test_ptr(5), last.clone()] { + ctx.start_and_sync_to(stop_block.clone()).await; + + let query_res = ctx + .query(r#"{ dataSourceCount(id: "5") { id, count } }"#) + .await + .unwrap(); + + // TODO: The semantically correct value for `count` would be 6. But because the test fixture + // uses a `NoopTriggersAdapter` the data sources are not reprocessed in the block in which they + // are created. + assert_eq!( + query_res, + Some(object! { dataSourceCount: object!{ id: "5", count: 5 } }) + ); + } + + // Restart the subgraph once more, which runs more consistency checks on dynamic data sources. + ctx.start_and_sync_to(last).await; + Ok(()) } @@ -552,7 +613,6 @@ async fn end_block() -> anyhow::Result<()> { addr: &Address, should_contain_addr: bool, ) { - dbg!(block_ptr.number, should_contain_addr); let runner = ctx.runner(block_ptr.clone()).await; let runner = runner.run_for_test(false).await.unwrap(); let filter = runner.context().filter.as_ref().unwrap(); @@ -900,7 +960,7 @@ async fn block_handlers() { let blocks = { let block_0 = genesis(); - let block_1_to_3 = generate_empty_blocks_for_range(block_0.ptr(), 1, 3); + let block_1_to_3 = generate_empty_blocks_for_range(block_0.ptr(), 1, 3, 0); let block_4 = { let mut block = empty_block(block_1_to_3.last().unwrap().ptr(), test_ptr(4)); push_test_polling_trigger(&mut block);