diff --git a/Cargo.lock b/Cargo.lock index e150dafeeb..22c91257f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5808,6 +5808,7 @@ name = "pindexer" version = "0.79.0" dependencies = [ "anyhow", + "clap", "cometindex", "penumbra-app", "penumbra-asset", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index e32566c4c4..64cf3204bc 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -11,15 +11,16 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = {workspace = true} +clap = {workspace = true} +cometindex = {workspace = true} +penumbra-shielded-pool = {workspace = true, default-features = false} +penumbra-stake = {workspace = true, default-features = false} +penumbra-app = {workspace = true, default-features = false} +penumbra-num = {workspace = true, default-features = false} +penumbra-asset = {workspace = true, default-features = false} +penumbra-proto = {workspace = true, default-features = false} +tokio = {workspace = true, features = ["full"]} +serde_json = {workspace = true} sqlx = { workspace = true, features = ["chrono"] } -cometindex = { workspace = true } -penumbra-shielded-pool = { workspace = true, default-features = false } -penumbra-stake = { workspace = true, default-features = false } -penumbra-app = { workspace = true, default-features = false } -penumbra-num = { workspace = true, default-features = false } -penumbra-asset = { workspace = true, default-features = false } -penumbra-proto = { workspace = true, default-features = false } -tokio = { workspace = true, features = ["full"] } -anyhow = { workspace = true } -serde_json = { workspace = true } -tracing = { workspace = true } +tracing = {workspace = true} diff --git a/crates/bin/pindexer/src/block.rs b/crates/bin/pindexer/src/block.rs index 585d78f95e..221fcfb188 100644 --- a/crates/bin/pindexer/src/block.rs +++ b/crates/bin/pindexer/src/block.rs @@ -1,6 +1,6 @@ use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent}; -use sqlx::types::chrono::DateTime; +use sqlx::{types::chrono::DateTime, PgPool}; #[derive(Debug)] pub struct Block {} @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS block_details ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventBlockRoot::from_event(event.as_ref())?; let timestamp = pe.timestamp.expect("Block has no timestamp"); diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index bacfb8f57c..0f942bc374 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -1,4 +1,4 @@ -pub use cometindex::{AppView, Indexer}; +pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool, PgTransaction}; mod indexer_ext; pub use indexer_ext::IndexerExt; diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 8653858351..71989105ff 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,10 +1,11 @@ use anyhow::Result; +use clap::Parser as _; use pindexer::block::Block; -use pindexer::{Indexer, IndexerExt as _}; +use pindexer::{Indexer, IndexerExt as _, Options}; #[tokio::main] async fn main() -> Result<()> { - Indexer::new() + Indexer::new(Options::parse()) .with_default_tracing() .with_default_penumbra_app_views() .with_index(Block {}) diff --git a/crates/bin/pindexer/src/shielded_pool/fmd.rs b/crates/bin/pindexer/src/shielded_pool/fmd.rs index c23a18144d..50f4942dd3 100644 --- a/crates/bin/pindexer/src/shielded_pool/fmd.rs +++ b/crates/bin/pindexer/src/shielded_pool/fmd.rs @@ -1,4 +1,4 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent}; #[derive(Debug)] @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventBroadcastClue::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs index 8c04bb0b17..a8da474a4c 100644 --- a/crates/bin/pindexer/src/stake/delegation_txs.rs +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -54,6 +54,7 @@ impl AppView for DelegationTxs { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<()> { let pe = pb::EventDelegate::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/missed_blocks.rs b/crates/bin/pindexer/src/stake/missed_blocks.rs index 59282c8182..7b5115792c 100644 --- a/crates/bin/pindexer/src/stake/missed_blocks.rs +++ b/crates/bin/pindexer/src/stake/missed_blocks.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -52,6 +52,7 @@ impl AppView for MissedBlocks { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?; let ik_bytes = pe diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs index 19c3fe8281..1be89b6944 100644 --- a/crates/bin/pindexer/src/stake/slashings.rs +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -52,6 +52,7 @@ impl AppView for Slashings { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?; let ik = IdentityKey::try_from( diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs index 7038b91c41..d4f9da26e4 100644 --- a/crates/bin/pindexer/src/stake/undelegation_txs.rs +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -54,6 +54,7 @@ impl AppView for UndelegationTxs { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<()> { let pe = pb::EventUndelegate::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs index 8cd0e18bd4..67472ab403 100644 --- a/crates/bin/pindexer/src/stake/validator_set.rs +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use anyhow::{anyhow, Context, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_app::genesis::AppState; use penumbra_asset::asset; @@ -68,6 +68,7 @@ impl AppView for ValidatorSet { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { diff --git a/crates/util/cometindex/examples/fmd_clues.rs b/crates/util/cometindex/examples/fmd_clues.rs index 26b529f85e..fbd5fb3d15 100644 --- a/crates/util/cometindex/examples/fmd_clues.rs +++ b/crates/util/cometindex/examples/fmd_clues.rs @@ -1,5 +1,7 @@ use anyhow::Result; -use cometindex::{async_trait, AppView, ContextualizedEvent, Indexer, PgTransaction}; +use clap::Parser; +use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction}; +use sqlx::PgPool; // This example is silly because it doesn't do any "compilation" of the raw // events, so it's only useful as an example of exercising the harness and the @@ -39,6 +41,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { // this is just an example in the integration tests, so we don't want to do any // - queries against existing table state @@ -72,7 +75,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example ( #[tokio::main] async fn main() -> Result<()> { - Indexer::new() + Indexer::new(Options::parse()) .with_default_tracing() // add as many indexers as you want .with_index(FmdCluesExample {}) diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index 7e629cbda3..f90d666b42 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +pub use sqlx::PgPool; use sqlx::{Postgres, Transaction}; use crate::ContextualizedEvent; @@ -20,5 +21,6 @@ pub trait AppView: std::fmt::Debug { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + src_db: &PgPool, ) -> Result<(), anyhow::Error>; } diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index e620838dd3..733df4c71b 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -1,9 +1,8 @@ use std::pin::Pin; use anyhow::{Context as _, Result}; -use clap::Parser; use futures::{Stream, StreamExt, TryStreamExt}; -use sqlx::PgPool; +use sqlx::{postgres::PgPoolOptions, PgPool}; use tap::{Tap, TapFallible, TapOptional}; use tendermint::abci; use tracing::{debug, info}; @@ -16,9 +15,9 @@ pub struct Indexer { } impl Indexer { - pub fn new() -> Self { + pub fn new(opts: Options) -> Self { Self { - opts: Options::parse(), + opts, indexes: Vec::new(), } } @@ -60,7 +59,22 @@ impl Indexer { indexes, } = self; - let src_db = PgPool::connect(&src_database_url).await?; + // Create a source db, with, for sanity, some read only settings. + // These will be overrideable by a consumer who knows what they're doing, + // but prevents basic mistakes. + // c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 + let src_db = PgPoolOptions::new() + .after_connect(|conn, _| { + Box::pin(async move { + sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + .execute(conn) + .await?; + Ok(()) + }) + }) + .connect(&src_database_url) + .await?; + let dst_db = PgPool::connect(&dst_database_url).await?; // Check if the destination db is initialized @@ -169,7 +183,7 @@ impl Indexer { for index in indexes { if index.is_relevant(&event.as_ref().kind) { tracing::debug!(?event, ?index, "relevant to index"); - index.index_event(&mut dbtx, &event).await?; + index.index_event(&mut dbtx, &event, &src_db).await?; } } // Mark that we got to at least this event diff --git a/crates/util/cometindex/src/lib.rs b/crates/util/cometindex/src/lib.rs index f7ae442713..907dd6cced 100644 --- a/crates/util/cometindex/src/lib.rs +++ b/crates/util/cometindex/src/lib.rs @@ -5,7 +5,7 @@ pub mod indexer; pub mod opt; pub use contextualized::ContextualizedEvent; -pub use index::{AppView, PgTransaction}; +pub use index::{AppView, PgPool, PgTransaction}; pub use indexer::Indexer; pub use async_trait::async_trait; diff --git a/crates/util/cometindex/src/opt.rs b/crates/util/cometindex/src/opt.rs index 638c9c3a47..75381de6f2 100644 --- a/crates/util/cometindex/src/opt.rs +++ b/crates/util/cometindex/src/opt.rs @@ -4,7 +4,7 @@ use anyhow::{Error, Result}; use clap::Parser; /// This struct represents the command-line options -#[derive(Debug, Parser)] +#[derive(Clone, Debug, Parser)] #[clap( name = "cometindex", about = "processes raw events emitted by cometbft applications",