Skip to content

Commit

Permalink
[indexer-alt] Add ResetDatabase command (#20067)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Oct 29, 2024
1 parent aec94f8 commit 0fc16ba
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
url.workspace = true
diesel_migrations.workspace = true

mysten-metrics.workspace = true
sui-storage.workspace = true
Expand Down
20 changes: 19 additions & 1 deletion crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::db::DbConfig;
use crate::IndexerConfig;
use clap::Subcommand;

#[derive(clap::Parser, Debug, Clone)]
pub struct Args {
#[command(flatten)]
pub indexer_config: IndexerConfig,
pub db_config: DbConfig,

#[command(subcommand)]
pub command: Command,
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Clone, Debug)]
pub enum Command {
/// Run the indexer.
Indexer(IndexerConfig),
ResetDatabase {
/// If true, only drop all tables but do not run the migrations.
/// That is, no tables will exist in the DB after the reset.
#[clap(long, default_value_t = false)]
skip_migrations: bool,
},
}
97 changes: 94 additions & 3 deletions crates/sui-indexer-alt/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use anyhow::anyhow;
use diesel::migration::MigrationVersion;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::{
pooled_connection::{
bb8::{Pool, PooledConnection, RunError},
AsyncDieselConnectionManager, PoolError,
},
AsyncPgConnection,
AsyncPgConnection, RunQueryDsl,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::time::Duration;
use tracing::info;
use url::Url;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

#[derive(Clone)]
pub struct Db {
pool: Pool<AsyncPgConnection>,
Expand Down Expand Up @@ -64,4 +70,89 @@ impl Db {
pub(crate) fn state(&self) -> bb8::State {
self.pool.state()
}

async fn clear_database(&self) -> Result<(), anyhow::Error> {
info!("Clearing the database...");
let mut conn = self.connect().await?;
let drop_all_tables = "
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public')
LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_tables)
.execute(&mut conn)
.await?;
info!("Dropped all tables.");

let drop_all_procedures = "
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
WHERE ns.nspname = 'public' AND prokind = 'p')
LOOP
EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_procedures)
.execute(&mut conn)
.await?;
info!("Dropped all procedures.");

let drop_all_functions = "
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
FROM pg_proc INNER JOIN pg_namespace ON (pg_proc.pronamespace = pg_namespace.oid)
WHERE pg_namespace.nspname = 'public' AND prokind = 'f')
LOOP
EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_functions)
.execute(&mut conn)
.await?;
info!("Database cleared.");
Ok(())
}

pub(crate) async fn run_migrations(
&self,
) -> Result<Vec<MigrationVersion<'static>>, anyhow::Error> {
use diesel_migrations::MigrationHarness;

info!("Running migrations ...");
let conn = self.pool.dedicated_connection().await?;
let mut wrapper: AsyncConnectionWrapper<AsyncPgConnection> =
diesel_async::async_connection_wrapper::AsyncConnectionWrapper::from(conn);

let finished_migrations = tokio::task::spawn_blocking(move || {
wrapper
.run_pending_migrations(MIGRATIONS)
.map(|versions| versions.iter().map(MigrationVersion::as_owned).collect())
})
.await?
.map_err(|e| anyhow!("Failed to run migrations: {:?}", e))?;
info!("Migrations complete.");
Ok(finished_migrations)
}
}

/// Drop all tables and rerunning migrations.
pub async fn reset_database(
db_config: DbConfig,
skip_migrations: bool,
) -> Result<(), anyhow::Error> {
let db = Db::new(db_config).await?;
db.clear_database().await?;
if !skip_migrations {
db.run_migrations().await?;
}
Ok(())
}
17 changes: 11 additions & 6 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ pub struct IndexerConfig {
#[command(flatten)]
pub ingestion_config: IngestionConfig,

#[command(flatten)]
pub db_config: DbConfig,

#[command(flatten)]
pub pipeline_config: PipelineConfig,

Expand All @@ -95,21 +92,29 @@ pub struct IndexerConfig {
}

impl Indexer {
pub async fn new(config: IndexerConfig, cancel: CancellationToken) -> Result<Self> {
pub async fn new(
db_config: DbConfig,
indexer_config: IndexerConfig,
cancel: CancellationToken,
) -> Result<Self> {
let IndexerConfig {
ingestion_config,
db_config,
pipeline_config,
first_checkpoint,
last_checkpoint,
pipeline,
metrics_address,
} = config;
} = indexer_config;

let db = Db::new(db_config)
.await
.context("Failed to connect to database")?;

// At indexer initialization, we ensure that the DB schema is up-to-date.
db.run_migrations()
.await
.context("Failed to run pending migrations")?;

let (metrics, metrics_service) =
MetricsService::new(metrics_address, db.clone(), cancel.clone())?;
let ingestion_service =
Expand Down
37 changes: 23 additions & 14 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use anyhow::{Context, Result};
use clap::Parser;
use sui_indexer_alt::args::Command;
use sui_indexer_alt::db::reset_database;
use sui_indexer_alt::{
args::Args,
handlers::{
Expand All @@ -25,20 +27,27 @@ async fn main() -> Result<()> {

let cancel = CancellationToken::new();

let mut indexer = Indexer::new(args.indexer_config, cancel.clone()).await?;

indexer.concurrent_pipeline::<EvEmitMod>().await?;
indexer.concurrent_pipeline::<EvStructInst>().await?;
indexer.concurrent_pipeline::<KvCheckpoints>().await?;
indexer.concurrent_pipeline::<KvObjects>().await?;
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

cancel.cancelled().await;
let _ = h_indexer.await;
match args.command {
Command::Indexer(indexer_config) => {
let mut indexer = Indexer::new(args.db_config, indexer_config, cancel.clone()).await?;

indexer.concurrent_pipeline::<EvEmitMod>().await?;
indexer.concurrent_pipeline::<EvStructInst>().await?;
indexer.concurrent_pipeline::<KvCheckpoints>().await?;
indexer.concurrent_pipeline::<KvObjects>().await?;
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

cancel.cancelled().await;
let _ = h_indexer.await;
}
Command::ResetDatabase { skip_migrations } => {
reset_database(args.db_config, skip_migrations).await?;
}
}

Ok(())
}

0 comments on commit 0fc16ba

Please sign in to comment.