Skip to content

Commit

Permalink
[mvr] indexer schema subset and reduced persist tasks (#20100)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Oct 31, 2024
1 parent 7c517e7 commit c33374e
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) struct NamedMovePackage;

impl NamedMovePackage {
/// Queries a package by name (and version, encoded in the name but optional).
/// Name's format should be `{organization}/{application}:v{version}`.
/// Name's format should be `{organization}/{application}/{version}`.
pub(crate) async fn query(
ctx: &Context<'_>,
name: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ CREATE TABLE objects_history (
CONSTRAINT objects_history_pk PRIMARY KEY (checkpoint_sequence_number, object_id, object_version)
) PARTITION BY RANGE (checkpoint_sequence_number);
CREATE INDEX objects_history_id_version ON objects_history (object_id, object_version, checkpoint_sequence_number);
CREATE INDEX objects_history_owner ON objects_history (checkpoint_sequence_number, owner_type, owner_id) WHERE owner_type BETWEEN 1 AND 2 AND owner_id IS NOT NULL;
CREATE INDEX objects_history_coin_owner ON objects_history (checkpoint_sequence_number, owner_id, coin_type, object_id) WHERE coin_type IS NOT NULL AND owner_type = 1;
CREATE INDEX objects_history_coin_only ON objects_history (checkpoint_sequence_number, coin_type, object_id) WHERE coin_type IS NOT NULL;
CREATE INDEX objects_history_type ON objects_history (checkpoint_sequence_number, object_type);
CREATE INDEX objects_history_package_module_name_full_type ON objects_history (checkpoint_sequence_number, object_type_package, object_type_module, object_type_name, object_type);
CREATE INDEX objects_history_owner_package_module_name_full_type ON objects_history (checkpoint_sequence_number, owner_id, object_type_package, object_type_module, object_type_name, object_type);
-- init with first partition of the history table
CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);

Expand Down Expand Up @@ -99,9 +93,3 @@ CREATE TABLE objects_snapshot (
df_object_id bytea
);
CREATE INDEX objects_snapshot_checkpoint_sequence_number ON objects_snapshot (checkpoint_sequence_number);
CREATE INDEX objects_snapshot_owner ON objects_snapshot (owner_type, owner_id, object_id) WHERE owner_type BETWEEN 1 AND 2 AND owner_id IS NOT NULL;
CREATE INDEX objects_snapshot_coin_owner ON objects_snapshot (owner_id, coin_type, object_id) WHERE coin_type IS NOT NULL AND owner_type = 1;
CREATE INDEX objects_snapshot_coin_only ON objects_snapshot (coin_type, object_id) WHERE coin_type IS NOT NULL;
CREATE INDEX objects_snapshot_type_id ON objects_snapshot (object_type_package, object_type_module, object_type_name, object_type, object_id);
CREATE INDEX objects_snapshot_id_type ON objects_snapshot (object_id, object_type_package, object_type_module, object_type_name, object_type);
CREATE INDEX objects_snapshot_owner_package_module_name_full_type ON objects_snapshot (owner_id, object_type_package, object_type_module, object_type_name, object_type);
28 changes: 0 additions & 28 deletions crates/sui-mvr-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tracing::instrument;
use tracing::{error, info};

use crate::metrics::IndexerMetrics;
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::store::IndexerStore;
use crate::types::IndexerResult;

Expand Down Expand Up @@ -170,42 +169,15 @@ async fn commit_checkpoints<S>(

let guard = metrics.checkpoint_db_commit_latency.start_timer();
let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::<Vec<_>>();
let events_batch = events_batch.into_iter().flatten().collect::<Vec<_>>();
let event_indices_batch = event_indices_batch
.into_iter()
.flatten()
.collect::<Vec<_>>();
let object_versions_batch = object_versions_batch
.into_iter()
.flatten()
.collect::<Vec<_>>();
let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
let checkpoint_num = checkpoint_batch.len();
let tx_count = tx_batch.len();
let raw_checkpoints_batch = checkpoint_batch
.iter()
.map(|c| c.into())
.collect::<Vec<StoredRawCheckpoint>>();

{
let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
let mut persist_tasks = vec![
state.persist_transactions(tx_batch),
state.persist_tx_indices(tx_indices_batch),
state.persist_events(events_batch),
state.persist_event_indices(event_indices_batch),
state.persist_displays(display_updates_batch),
state.persist_packages(packages_batch),
// TODO: There are a few ways we could make the following more memory efficient.
// 1. persist_objects and persist_object_history both call another function to make the final
// committed object list. We could call it early and share the result.
// 2. We could avoid clone by using Arc.
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_full_objects_history(object_history_changes_batch.clone()),
state.persist_objects_version(object_versions_batch.clone()),
state.persist_raw_checkpoints(raw_checkpoints_batch),
];
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
Expand Down
103 changes: 2 additions & 101 deletions crates/sui-mvr-indexer/tests/ingestion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ use diesel_async::RunQueryDsl;
use simulacrum::Simulacrum;
use sui_mvr_indexer::errors::IndexerError;
use sui_mvr_indexer::handlers::TransactionObjectChangesToCommit;
use sui_mvr_indexer::models::{
checkpoints::StoredCheckpoint, objects::StoredObject, objects::StoredObjectSnapshot,
transactions::StoredTransaction,
};
use sui_mvr_indexer::schema::{checkpoints, objects, objects_snapshot, transactions};
use sui_mvr_indexer::models::{checkpoints::StoredCheckpoint, objects::StoredObjectSnapshot};
use sui_mvr_indexer::schema::{checkpoints, objects_snapshot};
use sui_mvr_indexer::store::indexer_store::IndexerStore;
use sui_mvr_indexer::test_utils::{
set_up, set_up_with_start_and_end_checkpoints, wait_for_checkpoint, wait_for_objects_snapshot,
Expand All @@ -22,57 +19,8 @@ use sui_mvr_indexer::types::IndexedDeletedObject;
use sui_mvr_indexer::types::IndexedObject;
use sui_mvr_indexer::types::TxIndex;
use sui_types::base_types::SuiAddress;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::gas_coin::GasCoin;
use sui_types::SUI_FRAMEWORK_PACKAGE_ID;
use tempfile::tempdir;

#[tokio::test]
pub async fn test_transaction_table() -> Result<(), IndexerError> {
let tempdir = tempdir().unwrap();
let mut sim = Simulacrum::new();
let data_ingestion_path = tempdir.path().to_path_buf();
sim.set_data_ingestion_path(data_ingestion_path.clone());

// Execute a simple transaction.
let transfer_recipient = SuiAddress::random_for_testing_only();
let (transaction, _) = sim.transfer_txn(transfer_recipient);
let (effects, err) = sim.execute_transaction(transaction.clone()).unwrap();
assert!(err.is_none());

// Create a checkpoint which should include the transaction we executed.
let checkpoint = sim.create_checkpoint();

let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await;

// Wait for the indexer to catch up to the checkpoint.
wait_for_checkpoint(&pg_store, 1).await?;

let digest = effects.transaction_digest();

// Read the transaction from the database directly.
let mut connection = pg_store.pool().dedicated_connection().await.unwrap();
let db_txn: StoredTransaction = transactions::table
.filter(transactions::transaction_digest.eq(digest.inner().to_vec()))
.first::<StoredTransaction>(&mut connection)
.await
.expect("Failed reading transaction from PostgresDB");

// Check that the transaction was stored correctly.
assert_eq!(db_txn.tx_sequence_number, 1);
assert_eq!(db_txn.transaction_digest, digest.inner().to_vec());
assert_eq!(
db_txn.raw_transaction,
bcs::to_bytes(&transaction.data()).unwrap()
);
assert_eq!(db_txn.raw_effects, bcs::to_bytes(&effects).unwrap());
assert_eq!(db_txn.timestamp_ms, checkpoint.timestamp_ms as i64);
assert_eq!(db_txn.checkpoint_sequence_number, 1);
assert_eq!(db_txn.transaction_kind, 1);
assert_eq!(db_txn.success_command_count, 2); // split coin + transfer
Ok(())
}

#[tokio::test]
pub async fn test_checkpoint_range_ingestion() -> Result<(), IndexerError> {
let tempdir = tempdir().unwrap();
Expand Down Expand Up @@ -139,53 +87,6 @@ pub async fn test_checkpoint_range_ingestion() -> Result<(), IndexerError> {
Ok(())
}

#[tokio::test]
pub async fn test_object_type() -> Result<(), IndexerError> {
let tempdir = tempdir().unwrap();
let mut sim = Simulacrum::new();
let data_ingestion_path = tempdir.path().to_path_buf();
sim.set_data_ingestion_path(data_ingestion_path.clone());

// Execute a simple transaction.
let transfer_recipient = SuiAddress::random_for_testing_only();
let (transaction, _) = sim.transfer_txn(transfer_recipient);
let (_, err) = sim.execute_transaction(transaction.clone()).unwrap();
assert!(err.is_none());

// Create a checkpoint which should include the transaction we executed.
let _ = sim.create_checkpoint();

let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await;

// Wait for the indexer to catch up to the checkpoint.
wait_for_checkpoint(&pg_store, 1).await?;

let obj_id = transaction.gas()[0].0;

// Read the transaction from the database directly.
let mut connection = pg_store.pool().dedicated_connection().await.unwrap();
let db_object: StoredObject = objects::table
.filter(objects::object_id.eq(obj_id.to_vec()))
.first::<StoredObject>(&mut connection)
.await
.expect("Failed reading object from PostgresDB");

let obj_type_tag = GasCoin::type_();

// Check that the different components of the event type were stored correctly.
assert_eq!(
db_object.object_type,
Some(obj_type_tag.to_canonical_string(true))
);
assert_eq!(
db_object.object_type_package,
Some(SUI_FRAMEWORK_PACKAGE_ID.to_vec())
);
assert_eq!(db_object.object_type_module, Some("coin".to_string()));
assert_eq!(db_object.object_type_name, Some("Coin".to_string()));
Ok(())
}

#[tokio::test]
pub async fn test_objects_snapshot() -> Result<(), IndexerError> {
let tempdir = tempdir().unwrap();
Expand Down

0 comments on commit c33374e

Please sign in to comment.