Skip to content

Commit

Permalink
Fix data source rewind across more than one block (#5083)
Browse files Browse the repository at this point in the history
* test: Add failing test for dds revert

* refactor: Remove 'Cancel the stream for real'

The `Action::Restart` will already drop the block stream and replace the guard in
the `instances` map, cancelling it in a very real way.

* fix(runner): Rationalize usage of `fn revert_state`

When reverting, the target block should be the target block of the revert,
so pass `revert_to_ptr` as the parameter and change the function to
`fn revert_state_to`. When restarting, it should revert to the last
known good block, but not actually revert it. The usage in the `Err`
case was redundant with the usage during restart.

* fix(revert): Fix long reverts of private dds
  • Loading branch information
leoyvens authored Dec 18, 2023
1 parent fe96516 commit 2d2203e
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 68 deletions.
24 changes: 6 additions & 18 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ where
/// be removed. The same thing also applies to the block cache.
/// This function must be called before continuing to process in order to avoid
/// duplicated host insertion and POI issues with dirty entity changes.
fn revert_state(&mut self, block_number: BlockNumber) -> Result<(), Error> {
fn revert_state_to(&mut self, block_number: BlockNumber) -> Result<(), Error> {
self.state.entity_lfu_cache = LfuCache::new();

// 1. Revert all hosts(created by DDS) up to block_number inclusively.
// 1. Revert all hosts(created by DDS) at a block higher than `block_number`.
// 2. Unmark any offchain data sources that were marked done on the blocks being removed.
// When no offchain datasources are present, 2. should be a noop.
self.ctx.revert_data_sources(block_number)?;
self.ctx.revert_data_sources(block_number + 1)?;
Ok(())
}

Expand Down Expand Up @@ -252,7 +252,7 @@ where
if let Some(store) = store.restart().await? {
let last_good_block =
store.block_ptr().map(|ptr| ptr.number).unwrap_or(0);
self.revert_state(last_good_block)?;
self.revert_state_to(last_good_block)?;
self.inputs = Arc::new(self.inputs.with_store(store));
self.state.synced = self.inputs.store.is_deployment_synced().await?;
}
Expand Down Expand Up @@ -793,15 +793,7 @@ where
}
}

if matches!(action, Action::Restart) {
// Cancel the stream for real
self.ctx.instances.remove(&self.inputs.deployment.id);

// And restart the subgraph
return Ok(Action::Restart);
}

return Ok(Action::Continue);
return Ok(action);
}
Err(BlockProcessingError::Canceled) => {
debug!(self.logger, "Subgraph block stream shut down cleanly");
Expand All @@ -811,7 +803,6 @@ where
// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
self.metrics.stream.deployment_failed.set(1.0);
self.revert_state(block_ptr.block_number())?;

let message = format!("{:#}", e).replace('\n', "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
Expand Down Expand Up @@ -860,9 +851,6 @@ where

// Retry logic below:

// Cancel the stream for real.
self.ctx.instances.remove(&self.inputs.deployment.id);

let message = format!("{:#}", e).replace('\n', "\t");
error!(self.logger, "Subgraph failed with non-deterministic error: {}", message;
"attempt" => self.state.backoff.attempt,
Expand Down Expand Up @@ -1361,7 +1349,7 @@ where
.deployment_head
.set(subgraph_ptr.number as f64);

self.revert_state(subgraph_ptr.number)?;
self.revert_state_to(revert_to_ptr.number)?;

let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr);

Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/dynds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ pub(crate) fn insert(
}
}

pub(crate) fn revert_to(
pub(crate) fn revert(
conn: &PgConnection,
site: &Site,
block: BlockNumber,
) -> Result<(), StoreError> {
match site.schema_version.private_data_sources() {
true => DataSourcesTable::new(site.namespace.clone()).revert_to(conn, block),
true => DataSourcesTable::new(site.namespace.clone()).revert(conn, block),
false => shared::revert(conn, &site.deployment, block),
}
}
Expand Down
15 changes: 7 additions & 8 deletions store/postgres/src/dynds/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,14 @@ impl DataSourcesTable {
Ok(inserted_total)
}

pub(crate) fn revert_to(
&self,
conn: &PgConnection,
block: BlockNumber,
) -> Result<(), StoreError> {
// Use `@>` to leverage the gist index.
// This assumes all ranges are of the form [x, +inf).
pub(crate) fn revert(&self, conn: &PgConnection, block: BlockNumber) -> Result<(), StoreError> {
// Use the 'does not extend to the left of' operator `&>` to leverage the gist index, this
// is equivalent to lower(block_range) >= $1.
//
// This assumes all ranges are of the form [x, +inf), and thefore no range needs to be
// unclamped.
let query = format!(
"delete from {} where block_range @> $1 and lower(block_range) >= $1",
"delete from {} where block_range &> int4range($1, null)",
self.qname
);
sql_query(query).bind::<Integer, _>(block).execute(conn)?;
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ impl Layout {
site: &Site,
block: BlockNumber,
) -> Result<(), StoreError> {
crate::dynds::revert_to(conn, site, block)?;
crate::dynds::revert(conn, site, block)?;
crate::deployment::revert_subgraph_errors(conn, &site.deployment, block)?;

Ok(())
Expand Down
28 changes: 15 additions & 13 deletions tests/src/fixture/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,23 @@ pub fn generate_empty_blocks_for_range(
parent_ptr: BlockPtr,
start: i32,
end: i32,
) -> Vec<BlockWithTriggers<graph_chain_ethereum::Chain>> {
(start + 1..end + 1).fold(
vec![empty_block(parent_ptr.clone(), test_ptr(start))],
|mut blocks, i| {
let parent_ptr = blocks.last().unwrap().ptr().clone();
blocks.push(empty_block(parent_ptr, test_ptr(i)));
blocks
},
)
add_to_hash: u64, // Use to differentiate forks
) -> Vec<BlockWithTriggers<Chain>> {
let mut blocks: Vec<BlockWithTriggers<Chain>> = vec![];

for i in start..(end + 1) {
let parent_ptr = blocks.last().map(|b| b.ptr()).unwrap_or(parent_ptr.clone());
let ptr = BlockPtr {
number: i,
hash: H256::from_low_u64_be(i as u64 + add_to_hash).into(),
};
blocks.push(empty_block(parent_ptr, ptr));
}

blocks
}

pub fn empty_block(
parent_ptr: BlockPtr,
ptr: BlockPtr,
) -> BlockWithTriggers<graph_chain_ethereum::Chain> {
pub fn empty_block(parent_ptr: BlockPtr, ptr: BlockPtr) -> BlockWithTriggers<Chain> {
assert!(ptr != parent_ptr);
assert!(ptr.number > parent_ptr.number);

Expand Down
66 changes: 42 additions & 24 deletions tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
pub mod ethereum;
pub mod substreams;

use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::Error;
use async_stream::stream;
Expand Down Expand Up @@ -578,11 +579,13 @@ pub async fn wait_for_sync(
stop_block: BlockPtr,
) -> Result<(), SubgraphError> {
// We wait one second between checks for the subgraph to sync. That
// means we wait up to a minute here by default
// means we wait up to a 30 seconds here by default.
lazy_static! {
static ref MAX_ERR_COUNT: usize = std::env::var("RUNNER_TESTS_WAIT_FOR_SYNC_SECS")
.map(|val| val.parse().unwrap())
.unwrap_or(60);
static ref MAX_WAIT: Duration = Duration::from_secs(
std::env::var("RUNNER_TESTS_WAIT_FOR_SYNC_SECS")
.map(|val| val.parse().unwrap())
.unwrap_or(30)
);
}
const WAIT_TIME: Duration = Duration::from_secs(1);

Expand All @@ -601,29 +604,26 @@ pub async fn wait_for_sync(
.unwrap();
}

let mut err_count = 0;
let start = Instant::now();

flush(logger, &store, deployment).await;

while err_count < *MAX_ERR_COUNT {
while start.elapsed() < *MAX_WAIT {
tokio::time::sleep(WAIT_TIME).await;
flush(logger, &store, deployment).await;

let block_ptr = match store.least_block_ptr(&deployment.hash).await {
Ok(Some(ptr)) => ptr,
res => {
info!(&logger, "{:?}", res);
err_count += 1;
continue;
}
};
info!(logger, "TEST: sync status: {:?}", block_ptr);
let status = store.status_for_id(deployment.id);

if let Some(fatal_error) = status.fatal_error {
if fatal_error.block_ptr.as_ref().unwrap() == &stop_block {
return Err(fatal_error);
}
return Err(fatal_error);
}

if block_ptr == stop_block {
Expand All @@ -634,8 +634,7 @@ pub async fn wait_for_sync(

// We only get here if we timed out waiting for the subgraph to reach
// the stop block
crit!(logger, "TEST: sync never completed (err_count={err_count})");
panic!("Sync did not complete within {err_count}s");
panic!("Sync did not complete within {}s", MAX_WAIT.as_secs());
}

struct StaticBlockRefetcher<C: Blockchain> {
Expand Down Expand Up @@ -799,28 +798,47 @@ fn stream_events<C: Blockchain>(
where
C::TriggerData: Clone,
{
struct ForkDb<B: Block> {
blocks: HashMap<BlockPtr, B>,
}

impl<B: Block> ForkDb<B> {
fn common_ancestor(&self, a: BlockPtr, b: BlockPtr) -> Option<&B> {
let mut a = self.blocks.get(&a).unwrap();
let mut b = self.blocks.get(&b).unwrap();
while a.number() > b.number() {
dbg!(a.ptr().number);
a = self.blocks.get(&a.parent_ptr()?).unwrap();
}
while b.number() > a.number() {
dbg!(b.ptr().number);
b = self.blocks.get(&b.parent_ptr()?).unwrap();
}
while a.hash() != b.hash() {
a = self.blocks.get(&a.parent_ptr()?).unwrap();
b = self.blocks.get(&b.parent_ptr()?).unwrap();
}
Some(a)
}
}

let fork_db = ForkDb {
blocks: blocks.iter().map(|b| (b.ptr(), b.block.clone())).collect(),
};

// See also: static-stream-builder
stream! {
let current_block = current_idx.map(|idx| &blocks[idx]);
let mut current_ptr = current_block.map(|b| b.ptr());
let mut current_parent_ptr = current_block.and_then(|b| b.parent_ptr());
let mut current_ptr = current_idx.map(|idx| blocks[idx].ptr());
let skip = current_idx.map(|idx| idx + 1).unwrap_or(0);
let mut blocks_iter = blocks.iter().skip(skip).peekable();
while let Some(&block) = blocks_iter.peek() {
if block.parent_ptr() == current_ptr {
current_ptr = Some(block.ptr());
current_parent_ptr = block.parent_ptr();
blocks_iter.next(); // Block consumed, advance the iterator.
yield Ok(BlockStreamEvent::ProcessBlock(block.clone(), FirehoseCursor::None));
} else {
let revert_to = current_parent_ptr.unwrap();
let revert_to = fork_db.common_ancestor(block.ptr(), current_ptr.unwrap()).unwrap().ptr();
current_ptr = Some(revert_to.clone());
current_parent_ptr = blocks
.iter()
.find(|b| b.ptr() == revert_to)
.unwrap()
.block
.parent_ptr();
yield Ok(BlockStreamEvent::Revert(revert_to, FirehoseCursor::None));
}
}
Expand Down
64 changes: 62 additions & 2 deletions tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,67 @@ async fn data_source_revert() -> anyhow::Result<()> {
Some(object! { dataSourceCount: object!{ id: "4", count: 4 } })
);

// This is an entirely different test, but running it here conveniently avoids race conditions
// since it uses the same deployment id.
data_source_long_revert().await.unwrap();

Ok(())
}

async fn data_source_long_revert() -> anyhow::Result<()> {
let RunnerTestRecipe {
stores,
test_name,
subgraph_name,
hash,
} = RunnerTestRecipe::new("data_source_long_revert", "data-source-revert").await;

let blocks = {
let block0 = genesis();
let blocks_1_to_5 = generate_empty_blocks_for_range(block0.ptr(), 1, 5, 0);
let blocks_1_to_5_reorged = generate_empty_blocks_for_range(block0.ptr(), 1, 5, 1);

let mut blocks = vec![block0];
blocks.extend(blocks_1_to_5);
blocks.extend(blocks_1_to_5_reorged);
blocks
};
let last = blocks.last().unwrap().block.ptr();

let chain = chain(&test_name, blocks.clone(), &stores, None).await;
let ctx = fixture::setup(
&test_name,
subgraph_name.clone(),
&hash,
&stores,
&chain,
None,
None,
)
.await;

// We sync up to block 5 twice, after the first time there is a revert back to block 1.
// This tests reverts across more than than a single block.
for stop_block in [test_ptr(5), last.clone()] {
ctx.start_and_sync_to(stop_block.clone()).await;

let query_res = ctx
.query(r#"{ dataSourceCount(id: "5") { id, count } }"#)
.await
.unwrap();

// TODO: The semantically correct value for `count` would be 6. But because the test fixture
// uses a `NoopTriggersAdapter` the data sources are not reprocessed in the block in which they
// are created.
assert_eq!(
query_res,
Some(object! { dataSourceCount: object!{ id: "5", count: 5 } })
);
}

// Restart the subgraph once more, which runs more consistency checks on dynamic data sources.
ctx.start_and_sync_to(last).await;

Ok(())
}

Expand Down Expand Up @@ -552,7 +613,6 @@ async fn end_block() -> anyhow::Result<()> {
addr: &Address,
should_contain_addr: bool,
) {
dbg!(block_ptr.number, should_contain_addr);
let runner = ctx.runner(block_ptr.clone()).await;
let runner = runner.run_for_test(false).await.unwrap();
let filter = runner.context().filter.as_ref().unwrap();
Expand Down Expand Up @@ -900,7 +960,7 @@ async fn block_handlers() {

let blocks = {
let block_0 = genesis();
let block_1_to_3 = generate_empty_blocks_for_range(block_0.ptr(), 1, 3);
let block_1_to_3 = generate_empty_blocks_for_range(block_0.ptr(), 1, 3, 0);
let block_4 = {
let mut block = empty_block(block_1_to_3.last().unwrap().ptr(), test_ptr(4));
push_test_polling_trigger(&mut block);
Expand Down

0 comments on commit 2d2203e

Please sign in to comment.