From a2ffd8a79f1b397e81df7b7e31b256d545cdf004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Tue, 23 Jul 2024 10:55:25 -0700 Subject: [PATCH] Make pindexer more easily embeddable as a library (#4712) ## Describe your changes We don't expose cometindex as a library, but we do pindexer, and this adds a few tweaks to make pindexer possible to run inside an external piece of Rust code. This is useful, because it allows people to colocate their indexer and their block explorer, for example. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > Doesn't touch the core protocol. --- Cargo.lock | 1 + crates/bin/pindexer/Cargo.toml | 23 ++++++++-------- crates/bin/pindexer/src/block.rs | 3 ++- crates/bin/pindexer/src/lib.rs | 2 +- crates/bin/pindexer/src/main.rs | 5 ++-- crates/bin/pindexer/src/shielded_pool/fmd.rs | 3 ++- .../bin/pindexer/src/stake/delegation_txs.rs | 3 ++- .../bin/pindexer/src/stake/missed_blocks.rs | 3 ++- crates/bin/pindexer/src/stake/slashings.rs | 3 ++- .../pindexer/src/stake/undelegation_txs.rs | 3 ++- .../bin/pindexer/src/stake/validator_set.rs | 3 ++- crates/util/cometindex/examples/fmd_clues.rs | 7 +++-- crates/util/cometindex/src/index.rs | 2 ++ crates/util/cometindex/src/indexer.rs | 26 ++++++++++++++----- crates/util/cometindex/src/lib.rs | 2 +- crates/util/cometindex/src/opt.rs | 2 +- 16 files changed, 60 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e150dafeeb..22c91257f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5808,6 +5808,7 @@ name = "pindexer" version = "0.79.0" dependencies = [ "anyhow", + "clap", "cometindex", "penumbra-app", "penumbra-asset", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index e32566c4c4..64cf3204bc 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -11,15 +11,16 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = {workspace = true} +clap = {workspace = true} +cometindex = {workspace = true} +penumbra-shielded-pool = {workspace = true, default-features = false} +penumbra-stake = {workspace = true, default-features = false} +penumbra-app = {workspace = true, default-features = false} +penumbra-num = {workspace = true, default-features = false} +penumbra-asset = {workspace = true, default-features = false} +penumbra-proto = {workspace = true, default-features = false} +tokio = {workspace = true, features = ["full"]} +serde_json = {workspace = true} sqlx = { workspace = true, features = ["chrono"] } -cometindex = { workspace = true } -penumbra-shielded-pool = { workspace = true, default-features = false } -penumbra-stake = { workspace = true, default-features = false } -penumbra-app = { workspace = true, default-features = false } -penumbra-num = { workspace = true, default-features = false } -penumbra-asset = { workspace = true, default-features = false } -penumbra-proto = { workspace = true, default-features = false } -tokio = { workspace = true, features = ["full"] } -anyhow = { workspace = true } -serde_json = { workspace = true } -tracing = { workspace = true } +tracing = {workspace = true} diff --git a/crates/bin/pindexer/src/block.rs b/crates/bin/pindexer/src/block.rs index 585d78f95e..221fcfb188 100644 --- a/crates/bin/pindexer/src/block.rs +++ b/crates/bin/pindexer/src/block.rs @@ -1,6 +1,6 @@ use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent}; -use sqlx::types::chrono::DateTime; +use sqlx::{types::chrono::DateTime, PgPool}; #[derive(Debug)] pub struct Block {} @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS block_details ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventBlockRoot::from_event(event.as_ref())?; let timestamp = pe.timestamp.expect("Block has no timestamp"); diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index bacfb8f57c..0f942bc374 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -1,4 +1,4 @@ -pub use cometindex::{AppView, Indexer}; +pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool, PgTransaction}; mod indexer_ext; pub use indexer_ext::IndexerExt; diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 8653858351..71989105ff 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,10 +1,11 @@ use anyhow::Result; +use clap::Parser as _; use pindexer::block::Block; -use pindexer::{Indexer, IndexerExt as _}; +use pindexer::{Indexer, IndexerExt as _, Options}; #[tokio::main] async fn main() -> Result<()> { - Indexer::new() + Indexer::new(Options::parse()) .with_default_tracing() .with_default_penumbra_app_views() .with_index(Block {}) diff --git a/crates/bin/pindexer/src/shielded_pool/fmd.rs b/crates/bin/pindexer/src/shielded_pool/fmd.rs index c23a18144d..50f4942dd3 100644 --- a/crates/bin/pindexer/src/shielded_pool/fmd.rs +++ b/crates/bin/pindexer/src/shielded_pool/fmd.rs @@ -1,4 +1,4 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent}; #[derive(Debug)] @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventBroadcastClue::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs index 8c04bb0b17..a8da474a4c 100644 --- a/crates/bin/pindexer/src/stake/delegation_txs.rs +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -54,6 +54,7 @@ impl AppView for DelegationTxs { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<()> { let pe = pb::EventDelegate::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/missed_blocks.rs b/crates/bin/pindexer/src/stake/missed_blocks.rs index 59282c8182..7b5115792c 100644 --- a/crates/bin/pindexer/src/stake/missed_blocks.rs +++ b/crates/bin/pindexer/src/stake/missed_blocks.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -52,6 +52,7 @@ impl AppView for MissedBlocks { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?; let ik_bytes = pe diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs index 19c3fe8281..1be89b6944 100644 --- a/crates/bin/pindexer/src/stake/slashings.rs +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -52,6 +52,7 @@ impl AppView for Slashings { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?; let ik = IdentityKey::try_from( diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs index 7038b91c41..d4f9da26e4 100644 --- a/crates/bin/pindexer/src/stake/undelegation_txs.rs +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; @@ -54,6 +54,7 @@ impl AppView for UndelegationTxs { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<()> { let pe = pb::EventUndelegate::from_event(event.as_ref())?; diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs index 8cd0e18bd4..67472ab403 100644 --- a/crates/bin/pindexer/src/stake/validator_set.rs +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use anyhow::{anyhow, Context, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; use penumbra_app::genesis::AppState; use penumbra_asset::asset; @@ -68,6 +68,7 @@ impl AppView for ValidatorSet { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { diff --git a/crates/util/cometindex/examples/fmd_clues.rs b/crates/util/cometindex/examples/fmd_clues.rs index 26b529f85e..fbd5fb3d15 100644 --- a/crates/util/cometindex/examples/fmd_clues.rs +++ b/crates/util/cometindex/examples/fmd_clues.rs @@ -1,5 +1,7 @@ use anyhow::Result; -use cometindex::{async_trait, AppView, ContextualizedEvent, Indexer, PgTransaction}; +use clap::Parser; +use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction}; +use sqlx::PgPool; // This example is silly because it doesn't do any "compilation" of the raw // events, so it's only useful as an example of exercising the harness and the @@ -39,6 +41,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example ( &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + _src_db: &PgPool, ) -> Result<(), anyhow::Error> { // this is just an example in the integration tests, so we don't want to do any // - queries against existing table state @@ -72,7 +75,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example ( #[tokio::main] async fn main() -> Result<()> { - Indexer::new() + Indexer::new(Options::parse()) .with_default_tracing() // add as many indexers as you want .with_index(FmdCluesExample {}) diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index 7e629cbda3..f90d666b42 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +pub use sqlx::PgPool; use sqlx::{Postgres, Transaction}; use crate::ContextualizedEvent; @@ -20,5 +21,6 @@ pub trait AppView: std::fmt::Debug { &self, dbtx: &mut PgTransaction, event: &ContextualizedEvent, + src_db: &PgPool, ) -> Result<(), anyhow::Error>; } diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index e620838dd3..733df4c71b 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -1,9 +1,8 @@ use std::pin::Pin; use anyhow::{Context as _, Result}; -use clap::Parser; use futures::{Stream, StreamExt, TryStreamExt}; -use sqlx::PgPool; +use sqlx::{postgres::PgPoolOptions, PgPool}; use tap::{Tap, TapFallible, TapOptional}; use tendermint::abci; use tracing::{debug, info}; @@ -16,9 +15,9 @@ pub struct Indexer { } impl Indexer { - pub fn new() -> Self { + pub fn new(opts: Options) -> Self { Self { - opts: Options::parse(), + opts, indexes: Vec::new(), } } @@ -60,7 +59,22 @@ impl Indexer { indexes, } = self; - let src_db = PgPool::connect(&src_database_url).await?; + // Create a source db, with, for sanity, some read only settings. + // These will be overrideable by a consumer who knows what they're doing, + // but prevents basic mistakes. + // c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 + let src_db = PgPoolOptions::new() + .after_connect(|conn, _| { + Box::pin(async move { + sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + .execute(conn) + .await?; + Ok(()) + }) + }) + .connect(&src_database_url) + .await?; + let dst_db = PgPool::connect(&dst_database_url).await?; // Check if the destination db is initialized @@ -169,7 +183,7 @@ impl Indexer { for index in indexes { if index.is_relevant(&event.as_ref().kind) { tracing::debug!(?event, ?index, "relevant to index"); - index.index_event(&mut dbtx, &event).await?; + index.index_event(&mut dbtx, &event, &src_db).await?; } } // Mark that we got to at least this event diff --git a/crates/util/cometindex/src/lib.rs b/crates/util/cometindex/src/lib.rs index f7ae442713..907dd6cced 100644 --- a/crates/util/cometindex/src/lib.rs +++ b/crates/util/cometindex/src/lib.rs @@ -5,7 +5,7 @@ pub mod indexer; pub mod opt; pub use contextualized::ContextualizedEvent; -pub use index::{AppView, PgTransaction}; +pub use index::{AppView, PgPool, PgTransaction}; pub use indexer::Indexer; pub use async_trait::async_trait; diff --git a/crates/util/cometindex/src/opt.rs b/crates/util/cometindex/src/opt.rs index 638c9c3a47..75381de6f2 100644 --- a/crates/util/cometindex/src/opt.rs +++ b/crates/util/cometindex/src/opt.rs @@ -4,7 +4,7 @@ use anyhow::{Error, Result}; use clap::Parser; /// This struct represents the command-line options -#[derive(Debug, Parser)] +#[derive(Clone, Debug, Parser)] #[clap( name = "cometindex", about = "processes raw events emitted by cometbft applications",