Skip to content

Commit

Permalink
Track event indexing progress (#3075)
Browse files Browse the repository at this point in the history
# Description
Our event indexing logic currently can't handle some cases well during
restarts. Let's say a contract we index doesn't emit an event for 1
month our current logic would fetch the last indexed event from that
contract and continue indexing from there. That means on restarts we
might index ~1 month (theoretically unbounded) of blocks before we can
start building new auctions (we recently started to require the indexing
logic to have seen the tip of the chain before we can start building
auctions).
Especially on fast chains like `arbitrum` this is a huge issue.

Dusts off @squadgazzz's previous
[PR](#2965)

# Changes
* introduced a new table `last_processed_blocks` which associates a
block number for an arbitrary string
* populated table with data for `settlements`, `ethflow_refunds`, and
`onchain_orders`

## How to test
- 1 database specific test
- actually not sure how to best test this e2e. We could spawn an
autopilot, index an event, mint a ton of empty blocks, and restart the
autopilot to trigger the code we are interested in. But it's not super
clear to me what would be a good assertion that can be tested with
reasonable effort. 🤔

---------

Co-authored-by: ilya <[email protected]>
  • Loading branch information
MartinquaXD and squadgazzz authored Oct 23, 2024
1 parent 11df525 commit fad84f1
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 130 deletions.
25 changes: 25 additions & 0 deletions crates/autopilot/src/boundary/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,26 @@
use {
anyhow::{Context, Result},
sqlx::PgPool,
};

pub mod settlement;

pub async fn write_last_block_to_db(db: &PgPool, last_block: u64, contract: &str) -> Result<()> {
let mut ex = db.acquire().await?;
database::last_indexed_blocks::update(
&mut ex,
contract,
i64::try_from(last_block).context("new value of counter is not i64")?,
)
.await?;
Ok(())
}

pub async fn read_last_block_from_db(db: &PgPool, contract: &str) -> Result<u64> {
let mut ex = db.acquire().await?;
database::last_indexed_blocks::fetch(&mut ex, contract)
.await?
.unwrap_or_default()
.try_into()
.context("last block is not u64")
}
10 changes: 8 additions & 2 deletions crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ impl Indexer {
}
}

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "settlements";

#[async_trait::async_trait]
impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
async fn last_event_block(&self) -> Result<u64> {
let mut con = self.db.pool.acquire().await?;
crate::database::events::last_event_block(&mut con).await
super::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, latest_block: u64) -> Result<()> {
super::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME).await
}

async fn replace_events(
Expand Down
11 changes: 8 additions & 3 deletions crates/autopilot/src/database/ethflow_events/event_storing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ fn get_refunds(events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<Vec<Refu

type EthFlowEvent = contracts::cowswap_eth_flow::Event;

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "ethflow_refunds";

#[async_trait::async_trait]
impl EventStoring<EthFlowEvent> for Postgres {
async fn last_event_block(&self) -> Result<u64> {
let mut ex = self.pool.acquire().await?;
let block = database::ethflow_orders::last_indexed_block(&mut ex).await?;
Ok(block.unwrap_or_default() as u64)
crate::boundary::events::read_last_block_from_db(&self.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, last_block: u64) -> Result<()> {
crate::boundary::events::write_last_block_to_db(&self.pool, last_block, INDEX_NAME).await
}

async fn append_events(&mut self, events: Vec<ethcontract::Event<EthFlowEvent>>) -> Result<()> {
Expand Down
13 changes: 0 additions & 13 deletions crates/autopilot/src/database/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use {
},
ethcontract::{Event as EthContractEvent, EventMetadata},
number::conversions::u256_to_big_decimal,
sqlx::PgConnection,
std::convert::TryInto,
};

Expand All @@ -43,18 +42,6 @@ pub fn contract_to_db_events(
.collect::<Result<Vec<_>>>()
}

pub async fn last_event_block(connection: &mut PgConnection) -> Result<u64> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let block_number = database::events::last_block(connection)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}

pub async fn append_events(
transaction: &mut PgTransaction<'_>,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down
33 changes: 20 additions & 13 deletions crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,30 @@ where
) -> EventRow;
}

/// This name is used to store the latest indexed block in the db.
const INDEX_NAME: &str = "onchain_orders";

#[async_trait::async_trait]
impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
for OnchainOrderParser<T, W>
{
async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["read_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::read_last_block_from_db(&self.db.pool, INDEX_NAME).await
}

async fn persist_last_indexed_block(&mut self, latest_block: u64) -> Result<()> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["update_last_block_onchain_orders"])
.start_timer();
crate::boundary::events::write_last_block_to_db(&self.db.pool, latest_block, INDEX_NAME)
.await
}

async fn replace_events(
&mut self,
events: Vec<EthContractEvent<ContractEvent>>,
Expand Down Expand Up @@ -183,19 +203,6 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>

Ok(())
}

async fn last_event_block(&self) -> Result<u64> {
let _timer = DatabaseMetrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.db.pool.acquire().await?;
let block_number = database::onchain_broadcasted_orders::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}
}

impl<T: Send + Sync + Clone, W: Send + Sync> OnchainOrderParser<T, W> {
Expand Down
5 changes: 5 additions & 0 deletions crates/cow-amm/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ impl EventStoring<CowAmmEvent> for Storage {
.unwrap_or(self.0.start_of_index);
Ok(last_block)
}

async fn persist_last_indexed_block(&mut self, _new_value: u64) -> anyhow::Result<()> {
// storage is only in-memory so we don't need to persist anything here
Ok(())
}
}
98 changes: 0 additions & 98 deletions crates/database/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ pub struct EventIndex {
pub log_index: i64,
}

pub async fn last_block(ex: &mut PgConnection) -> Result<i64, sqlx::Error> {
const QUERY: &str = "\
SELECT GREATEST( (SELECT COALESCE(MAX(block_number), 0) FROM trades), (SELECT \
COALESCE(MAX(block_number), 0) FROM settlements), (SELECT \
COALESCE(MAX(block_number), 0) FROM invalidations), (SELECT \
COALESCE(MAX(block_number), 0) FROM presignature_events));";
sqlx::query_scalar(QUERY).fetch_one(ex).await
}

pub async fn delete(
ex: &mut PgTransaction<'_>,
delete_from_block_number: u64,
Expand Down Expand Up @@ -171,92 +162,3 @@ async fn insert_presignature(
.await?;
Ok(())
}

#[cfg(test)]
mod tests {
use {super::*, sqlx::Connection};

#[tokio::test]
#[ignore]
async fn postgres_events() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

assert_eq!(last_block(&mut db).await.unwrap(), 0);

let mut event_index = EventIndex {
block_number: 1,
log_index: 0,
};
append(
&mut db,
&[(event_index, Event::Invalidation(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 1);

event_index.block_number = 2;
append(&mut db, &[(event_index, Event::Trade(Default::default()))])
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 2);

event_index.block_number = 3;
append(
&mut db,
&[(event_index, Event::PreSignature(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 3);

event_index.block_number = 4;
append(
&mut db,
&[(event_index, Event::Settlement(Default::default()))],
)
.await
.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 4);

delete(&mut db, 5).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 4);

delete(&mut db, 3).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 2);

delete(&mut db, 0).await.unwrap();
assert_eq!(last_block(&mut db).await.unwrap(), 0);
}

#[tokio::test]
#[ignore]
async fn postgres_repeated_event_insert_ignored() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();
async fn append(con: &mut PgTransaction<'_>, log_index: i64, event: Event) {
super::append(
con,
&[(
EventIndex {
block_number: 2,
log_index,
},
event,
)],
)
.await
.unwrap()
}
for _ in 0..2 {
append(&mut db, 0, Event::Trade(Default::default())).await;
append(&mut db, 1, Event::Invalidation(Default::default())).await;
append(&mut db, 2, Event::Settlement(Default::default())).await;
append(&mut db, 3, Event::PreSignature(Default::default())).await;
}
assert_eq!(last_block(&mut db).await.unwrap(), 2);
}
}
52 changes: 52 additions & 0 deletions crates/database/src/last_indexed_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use sqlx::{Executor, PgConnection};

pub async fn update(
ex: &mut PgConnection,
contract: &str,
last_indexed_block: i64,
) -> Result<(), sqlx::Error> {
const QUERY: &str = r#"
INSERT INTO last_indexed_blocks (contract, block_number)
VALUES ($1, $2)
ON CONFLICT (contract)
DO UPDATE SET block_number = EXCLUDED.block_number;
"#;

ex.execute(sqlx::query(QUERY).bind(contract).bind(last_indexed_block))
.await?;
Ok(())
}

pub async fn fetch(ex: &mut PgConnection, contract: &str) -> Result<Option<i64>, sqlx::Error> {
const QUERY: &str = r#"
SELECT block_number
FROM last_indexed_blocks
WHERE contract = $1;
"#;

sqlx::query_scalar(QUERY)
.bind(contract)
.fetch_optional(ex)
.await
}

#[cfg(test)]
mod tests {
use {super::*, sqlx::Connection};

#[tokio::test]
#[ignore]
async fn postgres_last_indexed_block_roundtrip() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();

assert_eq!(fetch(&mut db, "test").await.unwrap(), None);

update(&mut db, "test", 42).await.unwrap();
assert_eq!(fetch(&mut db, "test").await.unwrap(), Some(42));

update(&mut db, "test", 43).await.unwrap();
assert_eq!(fetch(&mut db, "test").await.unwrap(), Some(43));
}
}
2 changes: 2 additions & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod ethflow_orders;
pub mod events;
pub mod fee_policies;
pub mod jit_orders;
pub mod last_indexed_blocks;
pub mod onchain_broadcasted_orders;
pub mod onchain_invalidations;
pub mod order_events;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub const TABLES: &[&str] = &[
"orders",
"trades",
"invalidations",
"last_indexed_blocks",
"quotes",
"settlements",
"presignature_events",
Expand Down
16 changes: 15 additions & 1 deletion crates/shared/src/event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ pub trait EventStoring<T>: Send + Sync {
/// * `events` the contract events to be appended by the implementer
async fn append_events(&mut self, events: Vec<EthcontractEvent<T>>) -> Result<()>;

/// Fetches the last processed block to know where to resume indexing after
/// a restart.
async fn last_event_block(&self) -> Result<u64>;

/// Stores the last processed block to know where to resume indexing after a
/// restart.
async fn persist_last_indexed_block(&mut self, last_block: u64) -> Result<()>;
}

pub trait EventRetrieving {
Expand Down Expand Up @@ -282,9 +288,12 @@ where
if let Some(range) = event_range.history_range {
self.update_events_from_old_blocks(range).await?;
}
if !event_range.latest_blocks.is_empty() {
if let Some(last_block) = event_range.latest_blocks.last() {
self.update_events_from_latest_blocks(&event_range.latest_blocks, event_range.is_reorg)
.await?;
self.store_mut()
.persist_last_indexed_block(last_block.0)
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -652,6 +661,11 @@ mod tests {
.map(|event| event.meta.clone().unwrap().block_number)
.unwrap_or_default())
}

async fn persist_last_indexed_block(&mut self, _last_block: u64) -> Result<()> {
// Nothing to do here since `last_event_block` looks up last stored event.
Ok(())
}
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ where
async fn last_event_block(&self) -> Result<u64> {
Ok(self.last_event_block())
}

async fn persist_last_indexed_block(&mut self, _block: u64) -> Result<()> {
// storage is only in-memory so we don't need to persist anything here
Ok(())
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions crates/shared/src/sources/uniswap_v3/event_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ impl EventStoring<UniswapV3Event> for RecentEventsCache {
.cloned()
.context("no events")
}

async fn persist_last_indexed_block(&mut self, _block: u64) -> Result<()> {
// storage is only in-memory so we don't need to persist anything here
Ok(())
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit fad84f1

Please sign in to comment.