From 0fc16ba603e72bbbee93fcf65b4a05e90de41812 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 29 Oct 2024 08:50:33 -0700 Subject: [PATCH] [indexer-alt] Add ResetDatabase command (#20067) ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + crates/sui-indexer-alt/Cargo.toml | 1 + crates/sui-indexer-alt/src/args.rs | 20 +++++- crates/sui-indexer-alt/src/db.rs | 97 +++++++++++++++++++++++++++++- crates/sui-indexer-alt/src/lib.rs | 17 ++++-- crates/sui-indexer-alt/src/main.rs | 37 +++++++----- 6 files changed, 149 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80e1dc6c34a09..8d865d25a6bf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13907,6 +13907,7 @@ dependencies = [ "clap", "diesel", "diesel-async", + "diesel_migrations", "futures", "mysten-metrics", "prometheus", diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index c80b0b45808d5..a9c0c69826cda 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -31,6 +31,7 @@ tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true url.workspace = true +diesel_migrations.workspace = true mysten-metrics.workspace = true sui-storage.workspace = true diff --git a/crates/sui-indexer-alt/src/args.rs b/crates/sui-indexer-alt/src/args.rs index f094f377e437f..cf688a5b7acf6 100644 --- a/crates/sui-indexer-alt/src/args.rs +++ b/crates/sui-indexer-alt/src/args.rs @@ -1,10 +1,28 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::db::DbConfig; use crate::IndexerConfig; +use clap::Subcommand; #[derive(clap::Parser, Debug, Clone)] pub struct Args { #[command(flatten)] - pub indexer_config: IndexerConfig, + pub db_config: DbConfig, + + #[command(subcommand)] + pub command: Command, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Subcommand, Clone, Debug)] +pub enum Command { + /// Run the indexer. + Indexer(IndexerConfig), + ResetDatabase { + /// If true, only drop all tables but do not run the migrations. + /// That is, no tables will exist in the DB after the reset. + #[clap(long, default_value_t = false)] + skip_migrations: bool, + }, } diff --git a/crates/sui-indexer-alt/src/db.rs b/crates/sui-indexer-alt/src/db.rs index 99fe94c9fb7fb..cc2658099c742 100644 --- a/crates/sui-indexer-alt/src/db.rs +++ b/crates/sui-indexer-alt/src/db.rs @@ -1,17 +1,23 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; - +use anyhow::anyhow; +use diesel::migration::MigrationVersion; +use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::{ pooled_connection::{ bb8::{Pool, PooledConnection, RunError}, AsyncDieselConnectionManager, PoolError, }, - AsyncPgConnection, + AsyncPgConnection, RunQueryDsl, }; +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; +use std::time::Duration; +use tracing::info; use url::Url; +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); + #[derive(Clone)] pub struct Db { pool: Pool, @@ -64,4 +70,89 @@ impl Db { pub(crate) fn state(&self) -> bb8::State { self.pool.state() } + + async fn clear_database(&self) -> Result<(), anyhow::Error> { + info!("Clearing the database..."); + let mut conn = self.connect().await?; + let drop_all_tables = " + DO $$ DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public') + LOOP + EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE'; + END LOOP; + END $$;"; + diesel::sql_query(drop_all_tables) + .execute(&mut conn) + .await?; + info!("Dropped all tables."); + + let drop_all_procedures = " + DO $$ DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes + FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid) + WHERE ns.nspname = 'public' AND prokind = 'p') + LOOP + EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE'; + END LOOP; + END $$;"; + diesel::sql_query(drop_all_procedures) + .execute(&mut conn) + .await?; + info!("Dropped all procedures."); + + let drop_all_functions = " + DO $$ DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes + FROM pg_proc INNER JOIN pg_namespace ON (pg_proc.pronamespace = pg_namespace.oid) + WHERE pg_namespace.nspname = 'public' AND prokind = 'f') + LOOP + EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE'; + END LOOP; + END $$;"; + diesel::sql_query(drop_all_functions) + .execute(&mut conn) + .await?; + info!("Database cleared."); + Ok(()) + } + + pub(crate) async fn run_migrations( + &self, + ) -> Result>, anyhow::Error> { + use diesel_migrations::MigrationHarness; + + info!("Running migrations ..."); + let conn = self.pool.dedicated_connection().await?; + let mut wrapper: AsyncConnectionWrapper = + diesel_async::async_connection_wrapper::AsyncConnectionWrapper::from(conn); + + let finished_migrations = tokio::task::spawn_blocking(move || { + wrapper + .run_pending_migrations(MIGRATIONS) + .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect()) + }) + .await? + .map_err(|e| anyhow!("Failed to run migrations: {:?}", e))?; + info!("Migrations complete."); + Ok(finished_migrations) + } +} + +/// Drop all tables and rerunning migrations. +pub async fn reset_database( + db_config: DbConfig, + skip_migrations: bool, +) -> Result<(), anyhow::Error> { + let db = Db::new(db_config).await?; + db.clear_database().await?; + if !skip_migrations { + db.run_migrations().await?; + } + Ok(()) } diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index e6e9f1e8e3ac4..5121e619140d1 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -67,9 +67,6 @@ pub struct IndexerConfig { #[command(flatten)] pub ingestion_config: IngestionConfig, - #[command(flatten)] - pub db_config: DbConfig, - #[command(flatten)] pub pipeline_config: PipelineConfig, @@ -95,21 +92,29 @@ pub struct IndexerConfig { } impl Indexer { - pub async fn new(config: IndexerConfig, cancel: CancellationToken) -> Result { + pub async fn new( + db_config: DbConfig, + indexer_config: IndexerConfig, + cancel: CancellationToken, + ) -> Result { let IndexerConfig { ingestion_config, - db_config, pipeline_config, first_checkpoint, last_checkpoint, pipeline, metrics_address, - } = config; + } = indexer_config; let db = Db::new(db_config) .await .context("Failed to connect to database")?; + // At indexer initialization, we ensure that the DB schema is up-to-date. + db.run_migrations() + .await + .context("Failed to run pending migrations")?; + let (metrics, metrics_service) = MetricsService::new(metrics_address, db.clone(), cancel.clone())?; let ingestion_service = diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 77d90d3fe0cb3..11de9a787181d 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -3,6 +3,8 @@ use anyhow::{Context, Result}; use clap::Parser; +use sui_indexer_alt::args::Command; +use sui_indexer_alt::db::reset_database; use sui_indexer_alt::{ args::Args, handlers::{ @@ -25,20 +27,27 @@ async fn main() -> Result<()> { let cancel = CancellationToken::new(); - let mut indexer = Indexer::new(args.indexer_config, cancel.clone()).await?; - - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - indexer.concurrent_pipeline::().await?; - - let h_indexer = indexer.run().await.context("Failed to start indexer")?; - - cancel.cancelled().await; - let _ = h_indexer.await; + match args.command { + Command::Indexer(indexer_config) => { + let mut indexer = Indexer::new(args.db_config, indexer_config, cancel.clone()).await?; + + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; + + let h_indexer = indexer.run().await.context("Failed to start indexer")?; + + cancel.cancelled().await; + let _ = h_indexer.await; + } + Command::ResetDatabase { skip_migrations } => { + reset_database(args.db_config, skip_migrations).await?; + } + } Ok(()) }