Skip to content

Commit

Permalink
[iota-indexer]: Adapt transactions queries to lack of index on digest…
Browse files Browse the repository at this point in the history
… column (#4586)
  • Loading branch information
tomxey authored Jan 7, 2025
1 parent bcaf173 commit 5c27c82
Showing 1 changed file with 110 additions and 41 deletions.
151 changes: 110 additions & 41 deletions crates/iota-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::{
use anyhow::{Result, anyhow};
use cached::{Cached, SizedCache};
use diesel::{
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, TextExpressionMethods,
ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, QueryDsl,
RunQueryDsl, SelectableHelper, TextExpressionMethods,
dsl::sql,
r2d2::{ConnectionManager, R2D2Connection},
sql_types::Bool,
Expand Down Expand Up @@ -63,7 +64,7 @@ use crate::{
},
schema::{
address_metrics, checkpoints, display, epochs, events, move_call_metrics, objects,
objects_snapshot, packages, transactions,
objects_snapshot, packages, pruner_cp_watermark, transactions, tx_digests,
},
store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver},
types::{IndexerResult, OwnerType},
Expand Down Expand Up @@ -461,7 +462,16 @@ impl<U: R2D2Connection> IndexerReader<U> {
) -> Result<IotaTransactionBlockEffects, IndexerError> {
let mut stored_txn: StoredTransaction = run_query!(&self.pool, |conn| {
transactions::table
.filter(transactions::transaction_digest.eq(digest.into_inner().to_vec()))
.filter(
transactions::tx_sequence_number
.nullable()
.eq(tx_digests::table
.select(tx_digests::tx_sequence_number)
// we filter the tx_digests table because it is indexed by digest,
// transactions table is not
.filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
.single_value()),
)
.first::<StoredTransaction>(conn)
})?;

Expand Down Expand Up @@ -497,7 +507,14 @@ impl<U: R2D2Connection> IndexerReader<U> {
.collect::<Vec<_>>();
let transactions = run_query!(&self.pool, |conn| {
transactions::table
.filter(transactions::transaction_digest.eq_any(digests))
.inner_join(
tx_digests::table
.on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
)
// we filter the tx_digests table because it is indexed by digest,
// transactions table is not
.filter(tx_digests::tx_digest.eq_any(digests))
.select(StoredTransaction::as_select())
.load::<StoredTransaction>(conn)
})?;
if cfg!(feature = "postgres-feature") {
Expand Down Expand Up @@ -722,8 +739,21 @@ impl<U: R2D2Connection> IndexerReader<U> {
limit: usize,
is_descending: bool,
) -> IndexerResult<Vec<IotaTransactionBlockResponse>> {
let pool = self.get_pool();
let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| {
pruner_cp_watermark::dsl::pruner_cp_watermark
.select((
pruner_cp_watermark::min_tx_sequence_number,
pruner_cp_watermark::max_tx_sequence_number,
))
// we filter the pruner_cp_watermark table because it is indexed by
// checkpoint_sequence_number, transactions is not
.filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
.first::<(i64, i64)>(conn)
})?;

let mut query = transactions::dsl::transactions
.filter(transactions::dsl::checkpoint_sequence_number.eq(checkpoint_seq as i64))
.filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
.into_boxed();

// Translate transaction digest cursor to tx sequence number
Expand Down Expand Up @@ -778,9 +808,11 @@ impl<U: R2D2Connection> IndexerReader<U> {
let cursor_tx_seq = if let Some(cursor) = cursor {
let pool = self.get_pool();
let tx_seq = run_query_async!(&pool, move |conn| {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(transactions::dsl::transaction_digest.eq(cursor.into_inner().to_vec()))
tx_digests::table
.select(tx_digests::tx_sequence_number)
// we filter the tx_digests table because it is indexed by digest,
// transactions (and other tables) are not
.filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
.first::<i64>(conn)
})?;
Some(tx_seq)
Expand Down Expand Up @@ -1019,7 +1051,16 @@ impl<U: R2D2Connection> IndexerReader<U> {
let pool = self.get_pool();
let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
transactions::table
.filter(transactions::transaction_digest.eq(digest.into_inner().to_vec()))
.filter(
transactions::tx_sequence_number
.nullable()
.eq(tx_digests::table
.select(tx_digests::tx_sequence_number)
// we filter the tx_digests table because it is indexed by digest,
// transactions table is not
.filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
.single_value()),
)
.select((transactions::timestamp_ms, transactions::events))
.first::<(i64, StoredTransactionEvents)>(conn)
})?;
Expand All @@ -1037,43 +1078,68 @@ impl<U: R2D2Connection> IndexerReader<U> {
Ok(iota_tx_events.map_or(vec![], |ste| ste.data))
}

fn query_events_by_tx_digest_query(
async fn query_events_by_tx_digest(
&self,
tx_digest: TransactionDigest,
cursor: Option<EventID>,
limit: usize,
descending_order: bool,
) -> IndexerResult<String> {
let cursor = if let Some(cursor) = cursor {
) -> IndexerResult<Vec<IotaEvent>> {
let mut query = events::table.into_boxed();

if let Some(cursor) = cursor {
if cursor.tx_digest != tx_digest {
return Err(IndexerError::InvalidArgument(
"Cursor tx_digest does not match the tx_digest in the query.".into(),
));
}
if descending_order {
format!("e.{EVENT_SEQUENCE_NUMBER_STR} < {}", cursor.event_seq)
query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
} else {
format!("e.{EVENT_SEQUENCE_NUMBER_STR} > {}", cursor.event_seq)
query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
}
} else if descending_order {
format!("e.{EVENT_SEQUENCE_NUMBER_STR} <= {}", i64::MAX)
query = query.filter(events::event_sequence_number.le(i64::MAX));
} else {
format!("e.{EVENT_SEQUENCE_NUMBER_STR} >= {}", 0)
query = query.filter(events::event_sequence_number.ge(0));
};

let order_clause = if descending_order { "DESC" } else { "ASC" };
Ok(format!(
"SELECT * \
FROM EVENTS e \
JOIN TRANSACTIONS t \
ON t.tx_sequence_number = e.tx_sequence_number \
AND t.transaction_digest = '\\x{}'::bytea \
WHERE {cursor} \
ORDER BY e.{EVENT_SEQUENCE_NUMBER_STR} {order_clause} \
LIMIT {limit}
",
Hex::encode(tx_digest.into_inner()),
))
if descending_order {
query = query.order(events::event_sequence_number.desc());
} else {
query = query.order(events::event_sequence_number.asc());
}

query = query.filter(
events::tx_sequence_number.nullable().eq(tx_digests::table
.select(tx_digests::tx_sequence_number)
// we filter the tx_digests table because it is indexed by digest,
// events table is not
.filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
.single_value()),
);

let pool = self.get_pool();
let stored_events = run_query_async!(&pool, move |conn| {
query.limit(limit as i64).load::<StoredEvent>(conn)
})?;

let mut iota_event_futures = vec![];
for stored_event in stored_events {
iota_event_futures.push(tokio::task::spawn(
stored_event.try_into_iota_event(self.package_resolver.clone()),
));
}

let iota_events = futures::future::join_all(iota_event_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?;
Ok(iota_events)
}

pub async fn query_events_in_blocking_task(
Expand All @@ -1093,21 +1159,22 @@ impl<U: R2D2Connection> IndexerReader<U> {
transactions::dsl::transactions
.select(transactions::tx_sequence_number)
.filter(
transactions::dsl::transaction_digest.eq(tx_digest.into_inner().to_vec()),
transactions::tx_sequence_number
.nullable()
.eq(tx_digests::table
.select(tx_digests::tx_sequence_number)
// we filter the tx_digests table because it is indexed by digest,
// transactions table is not
.filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
.single_value()),
)
.first::<i64>(conn)
})?;
(tx_seq, event_seq)
(tx_seq, event_seq as i64)
} else if descending_order {
let max_tx_seq: i64 = run_query_async!(&pool, move |conn| {
events::dsl::events
.select(events::tx_sequence_number)
.order(events::dsl::tx_sequence_number.desc())
.first::<i64>(conn)
})
.map_or(-1, |max_tx_seq| max_tx_seq + 1);

(max_tx_seq, 0)
let max_tx_seq = i64::MAX;
let max_event_seq = i64::MAX;
(max_tx_seq, max_event_seq)
} else {
(-1, 0)
};
Expand Down Expand Up @@ -1147,7 +1214,9 @@ impl<U: R2D2Connection> IndexerReader<U> {
limit,
)
} else if let EventFilter::Transaction(tx_digest) = filter {
self.query_events_by_tx_digest_query(tx_digest, cursor, limit, descending_order)?
return self
.query_events_by_tx_digest(tx_digest, cursor, limit, descending_order)
.await;
} else {
let main_where_clause = match filter {
EventFilter::Package(package_id) => {
Expand Down

0 comments on commit 5c27c82

Please sign in to comment.