diff --git a/crates/iota-indexer/src/indexer_reader.rs b/crates/iota-indexer/src/indexer_reader.rs index 1fa26c1969c..2ecf20f90bf 100644 --- a/crates/iota-indexer/src/indexer_reader.rs +++ b/crates/iota-indexer/src/indexer_reader.rs @@ -10,7 +10,8 @@ use std::{ use anyhow::{Result, anyhow}; use cached::{Cached, SizedCache}; use diesel::{ - ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, TextExpressionMethods, + ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, QueryDsl, + RunQueryDsl, SelectableHelper, TextExpressionMethods, dsl::sql, r2d2::{ConnectionManager, R2D2Connection}, sql_types::Bool, @@ -63,7 +64,7 @@ use crate::{ }, schema::{ address_metrics, checkpoints, display, epochs, events, move_call_metrics, objects, - objects_snapshot, packages, transactions, + objects_snapshot, packages, pruner_cp_watermark, transactions, tx_digests, }, store::{diesel_macro::*, package_resolver::IndexerStorePackageResolver}, types::{IndexerResult, OwnerType}, @@ -461,7 +462,16 @@ impl IndexerReader { ) -> Result { let mut stored_txn: StoredTransaction = run_query!(&self.pool, |conn| { transactions::table - .filter(transactions::transaction_digest.eq(digest.into_inner().to_vec())) + .filter( + transactions::tx_sequence_number + .nullable() + .eq(tx_digests::table + .select(tx_digests::tx_sequence_number) + // we filter the tx_digests table because it is indexed by digest, + // transactions table is not + .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec())) + .single_value()), + ) .first::(conn) })?; @@ -497,7 +507,14 @@ impl IndexerReader { .collect::>(); let transactions = run_query!(&self.pool, |conn| { transactions::table - .filter(transactions::transaction_digest.eq_any(digests)) + .inner_join( + tx_digests::table + .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)), + ) + // we filter the tx_digests table because it is indexed by digest, + // transactions table is not + .filter(tx_digests::tx_digest.eq_any(digests)) + .select(StoredTransaction::as_select()) .load::(conn) })?; if cfg!(feature = "postgres-feature") { @@ -722,8 +739,21 @@ impl IndexerReader { limit: usize, is_descending: bool, ) -> IndexerResult> { + let pool = self.get_pool(); + let tx_range: (i64, i64) = run_query_async!(&pool, move |conn| { + pruner_cp_watermark::dsl::pruner_cp_watermark + .select(( + pruner_cp_watermark::min_tx_sequence_number, + pruner_cp_watermark::max_tx_sequence_number, + )) + // we filter the pruner_cp_watermark table because it is indexed by + // checkpoint_sequence_number, transactions is not + .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64)) + .first::<(i64, i64)>(conn) + })?; + let mut query = transactions::dsl::transactions - .filter(transactions::dsl::checkpoint_sequence_number.eq(checkpoint_seq as i64)) + .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1)) .into_boxed(); // Translate transaction digest cursor to tx sequence number @@ -778,9 +808,11 @@ impl IndexerReader { let cursor_tx_seq = if let Some(cursor) = cursor { let pool = self.get_pool(); let tx_seq = run_query_async!(&pool, move |conn| { - transactions::dsl::transactions - .select(transactions::tx_sequence_number) - .filter(transactions::dsl::transaction_digest.eq(cursor.into_inner().to_vec())) + tx_digests::table + .select(tx_digests::tx_sequence_number) + // we filter the tx_digests table because it is indexed by digest, + // transactions (and other tables) are not + .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec())) .first::(conn) })?; Some(tx_seq) @@ -1019,7 +1051,16 @@ impl IndexerReader { let pool = self.get_pool(); let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| { transactions::table - .filter(transactions::transaction_digest.eq(digest.into_inner().to_vec())) + .filter( + transactions::tx_sequence_number + .nullable() + .eq(tx_digests::table + .select(tx_digests::tx_sequence_number) + // we filter the tx_digests table because it is indexed by digest, + // transactions table is not + .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec())) + .single_value()), + ) .select((transactions::timestamp_ms, transactions::events)) .first::<(i64, StoredTransactionEvents)>(conn) })?; @@ -1037,43 +1078,68 @@ impl IndexerReader { Ok(iota_tx_events.map_or(vec![], |ste| ste.data)) } - fn query_events_by_tx_digest_query( + async fn query_events_by_tx_digest( &self, tx_digest: TransactionDigest, cursor: Option, limit: usize, descending_order: bool, - ) -> IndexerResult { - let cursor = if let Some(cursor) = cursor { + ) -> IndexerResult> { + let mut query = events::table.into_boxed(); + + if let Some(cursor) = cursor { if cursor.tx_digest != tx_digest { return Err(IndexerError::InvalidArgument( "Cursor tx_digest does not match the tx_digest in the query.".into(), )); } if descending_order { - format!("e.{EVENT_SEQUENCE_NUMBER_STR} < {}", cursor.event_seq) + query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64)); } else { - format!("e.{EVENT_SEQUENCE_NUMBER_STR} > {}", cursor.event_seq) + query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64)); } } else if descending_order { - format!("e.{EVENT_SEQUENCE_NUMBER_STR} <= {}", i64::MAX) + query = query.filter(events::event_sequence_number.le(i64::MAX)); } else { - format!("e.{EVENT_SEQUENCE_NUMBER_STR} >= {}", 0) + query = query.filter(events::event_sequence_number.ge(0)); }; - let order_clause = if descending_order { "DESC" } else { "ASC" }; - Ok(format!( - "SELECT * \ - FROM EVENTS e \ - JOIN TRANSACTIONS t \ - ON t.tx_sequence_number = e.tx_sequence_number \ - AND t.transaction_digest = '\\x{}'::bytea \ - WHERE {cursor} \ - ORDER BY e.{EVENT_SEQUENCE_NUMBER_STR} {order_clause} \ - LIMIT {limit} - ", - Hex::encode(tx_digest.into_inner()), - )) + if descending_order { + query = query.order(events::event_sequence_number.desc()); + } else { + query = query.order(events::event_sequence_number.asc()); + } + + query = query.filter( + events::tx_sequence_number.nullable().eq(tx_digests::table + .select(tx_digests::tx_sequence_number) + // we filter the tx_digests table because it is indexed by digest, + // events table is not + .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec())) + .single_value()), + ); + + let pool = self.get_pool(); + let stored_events = run_query_async!(&pool, move |conn| { + query.limit(limit as i64).load::(conn) + })?; + + let mut iota_event_futures = vec![]; + for stored_event in stored_events { + iota_event_futures.push(tokio::task::spawn( + stored_event.try_into_iota_event(self.package_resolver.clone()), + )); + } + + let iota_events = futures::future::join_all(iota_event_futures) + .await + .into_iter() + .collect::, _>>() + .tap_err(|e| tracing::error!("Failed to join iota event futures: {}", e))? + .into_iter() + .collect::, _>>() + .tap_err(|e| tracing::error!("Failed to collect iota event futures: {}", e))?; + Ok(iota_events) } pub async fn query_events_in_blocking_task( @@ -1093,21 +1159,22 @@ impl IndexerReader { transactions::dsl::transactions .select(transactions::tx_sequence_number) .filter( - transactions::dsl::transaction_digest.eq(tx_digest.into_inner().to_vec()), + transactions::tx_sequence_number + .nullable() + .eq(tx_digests::table + .select(tx_digests::tx_sequence_number) + // we filter the tx_digests table because it is indexed by digest, + // transactions table is not + .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec())) + .single_value()), ) .first::(conn) })?; - (tx_seq, event_seq) + (tx_seq, event_seq as i64) } else if descending_order { - let max_tx_seq: i64 = run_query_async!(&pool, move |conn| { - events::dsl::events - .select(events::tx_sequence_number) - .order(events::dsl::tx_sequence_number.desc()) - .first::(conn) - }) - .map_or(-1, |max_tx_seq| max_tx_seq + 1); - - (max_tx_seq, 0) + let max_tx_seq = i64::MAX; + let max_event_seq = i64::MAX; + (max_tx_seq, max_event_seq) } else { (-1, 0) }; @@ -1147,7 +1214,9 @@ impl IndexerReader { limit, ) } else if let EventFilter::Transaction(tx_digest) = filter { - self.query_events_by_tx_digest_query(tx_digest, cursor, limit, descending_order)? + return self + .query_events_by_tx_digest(tx_digest, cursor, limit, descending_order) + .await; } else { let main_where_clause = match filter { EventFilter::Package(package_id) => {