diff --git a/Cargo.lock b/Cargo.lock index 115996efc5..951f142cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5809,9 +5809,15 @@ version = "0.79.0" dependencies = [ "anyhow", "cometindex", + "penumbra-app", + "penumbra-asset", + "penumbra-num", "penumbra-proto", "penumbra-shielded-pool", + "penumbra-stake", + "serde_json", "tokio", + "tracing", ] [[package]] diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 769af3d6ad..958b49ddd5 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -13,6 +13,12 @@ publish = false [dependencies] cometindex = {workspace = true} penumbra-shielded-pool = {workspace = true, default-features = false} +penumbra-stake = {workspace = true, default-features = false} +penumbra-app = {workspace = true, default-features = false} +penumbra-num = {workspace = true, default-features = false} +penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} tokio = {workspace = true, features = ["full"]} anyhow = {workspace = true} +serde_json = {workspace = true} +tracing = {workspace = true} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 380eeca5f9..693924626f 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -5,5 +5,10 @@ pub trait IndexerExt: Sized { impl IndexerExt for cometindex::Indexer { fn with_default_penumbra_app_views(self) -> Self { self.with_index(crate::shielded_pool::fmd::ClueSet {}) + .with_index(crate::stake::ValidatorSet {}) + .with_index(crate::stake::Slashings {}) + .with_index(crate::stake::MissedBlocks {}) + .with_index(crate::stake::DelegationTxs {}) + .with_index(crate::stake::UndelegationTxs {}) } } diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index d174c59f5e..11b0ac6603 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -1,4 +1,8 @@ pub use cometindex::{AppView, Indexer}; mod indexer_ext; +pub use indexer_ext::IndexerExt; + pub mod shielded_pool; + +pub mod stake; diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 57ce6f0eab..081a3ea274 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,11 +1,11 @@ use anyhow::Result; -use pindexer::{shielded_pool::fmd::ClueSet, Indexer}; +use pindexer::{Indexer, IndexerExt as _}; #[tokio::main] async fn main() -> Result<()> { Indexer::new() .with_default_tracing() - .with_index(ClueSet {}) + .with_default_penumbra_app_views() .run() .await?; diff --git a/crates/bin/pindexer/src/shielded_pool/fmd.rs b/crates/bin/pindexer/src/shielded_pool/fmd.rs index d6823b0347..c23a18144d 100644 --- a/crates/bin/pindexer/src/shielded_pool/fmd.rs +++ b/crates/bin/pindexer/src/shielded_pool/fmd.rs @@ -6,7 +6,11 @@ pub struct ClueSet {} #[async_trait] impl AppView for ClueSet { - async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { sqlx::query( // table name is module path + struct name " diff --git a/crates/bin/pindexer/src/stake.rs b/crates/bin/pindexer/src/stake.rs new file mode 100644 index 0000000000..0778ad7a2c --- /dev/null +++ b/crates/bin/pindexer/src/stake.rs @@ -0,0 +1,11 @@ +pub use delegation_txs::DelegationTxs; +pub use missed_blocks::MissedBlocks; +pub use slashings::Slashings; +pub use undelegation_txs::UndelegationTxs; +pub use validator_set::ValidatorSet; + +mod delegation_txs; +mod missed_blocks; +mod slashings; +mod undelegation_txs; +mod validator_set; diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs new file mode 100644 index 0000000000..8c04bb0b17 --- /dev/null +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -0,0 +1,82 @@ +use anyhow::{anyhow, Result}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_num::Amount; +use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; + +#[derive(Debug)] +pub struct DelegationTxs {} + +#[async_trait] +impl AppView for DelegationTxs { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<()> { + // Create the table + sqlx::query( + "CREATE TABLE stake_delegation_txs ( + id SERIAL PRIMARY KEY, + validator_ik BYTEA NOT NULL, + amount BIGINT NOT NULL, + height BIGINT NOT NULL, + tx_hash BYTEA NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create index on validator_ik + sqlx::query("CREATE INDEX idx_stake_delegation_txs_validator_ik ON stake_delegation_txs(validator_ik);") + .execute(dbtx.as_mut()) + .await?; + + // Create descending index on height + sqlx::query( + "CREATE INDEX idx_stake_delegation_txs_height ON stake_delegation_txs(height DESC);", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create composite index on validator_ik and height (descending) + sqlx::query("CREATE INDEX idx_stake_delegation_txs_validator_ik_height ON stake_delegation_txs(validator_ik, height DESC);") + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + type_str == "penumbra.core.component.stake.v1.EventDelegate" + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + ) -> Result<()> { + let pe = pb::EventDelegate::from_event(event.as_ref())?; + + let ik_bytes = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .ik; + + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; + + sqlx::query( + "INSERT INTO stake_delegation_txs (validator_ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" + ) + .bind(&ik_bytes) + .bind(amount.value() as i64) + .bind(event.block_height as i64) + .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/stake/missed_blocks.rs b/crates/bin/pindexer/src/stake/missed_blocks.rs new file mode 100644 index 0000000000..59282c8182 --- /dev/null +++ b/crates/bin/pindexer/src/stake/missed_blocks.rs @@ -0,0 +1,72 @@ +use anyhow::Result; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; + +use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; + +#[derive(Debug)] +pub struct MissedBlocks {} + +#[async_trait] +impl AppView for MissedBlocks { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + // Create the table + sqlx::query( + "CREATE TABLE stake_missed_blocks ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + ik BYTEA NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create descending index on height + sqlx::query( + "CREATE INDEX idx_stake_missed_blocks_height ON stake_missed_blocks(height DESC);", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create index on ik + sqlx::query("CREATE INDEX idx_stake_missed_blocks_ik ON stake_missed_blocks(ik);") + .execute(dbtx.as_mut()) + .await?; + + // Create composite index on height (descending) and ik + sqlx::query("CREATE INDEX idx_stake_missed_blocks_height_ik ON stake_missed_blocks(height DESC, ik);") + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + type_str == "penumbra.core.component.stake.v1.EventValidatorMissedBlock" + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?; + let ik_bytes = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .ik; + + let height = event.block_height; + + sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)") + .bind(height as i64) + .bind(ik_bytes) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs new file mode 100644 index 0000000000..19c3fe8281 --- /dev/null +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -0,0 +1,83 @@ +use anyhow::{anyhow, Result}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; + +use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; +use penumbra_stake::IdentityKey; + +#[derive(Debug)] +pub struct Slashings {} + +#[async_trait] +impl AppView for Slashings { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + sqlx::query( + "CREATE TABLE stake_slashings ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + ik BYTEA NOT NULL, + epoch_index BIGINT NOT NULL, + penalty TEXT NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE INDEX idx_stake_slashings_height ON stake_slashings(height);") + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE INDEX idx_stake_slashings_ik ON stake_slashings(ik);") + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE INDEX idx_stake_slashings_height_ik ON stake_slashings(height, ik);") + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + match type_str { + "penumbra.core.component.stake.v1.EventSlashingPenaltyApplied" => true, + _ => false, + } + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + + let height = event.block_height; + let epoch_index = pe.epoch_index; + + let penalty_json = serde_json::to_string( + &pe.new_penalty + .ok_or_else(|| anyhow!("missing new_penalty"))?, + )?; + + sqlx::query( + "INSERT INTO stake_slashings (height, ik, epoch_index, penalty) + VALUES ($1, $2, $3, $4)", + ) + .bind(height as i64) + .bind(ik.to_bytes()) + .bind(epoch_index as i64) + .bind(penalty_json) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs new file mode 100644 index 0000000000..7038b91c41 --- /dev/null +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -0,0 +1,82 @@ +use anyhow::{anyhow, Result}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_num::Amount; +use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; + +#[derive(Debug)] +pub struct UndelegationTxs {} + +#[async_trait] +impl AppView for UndelegationTxs { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<()> { + // Create the table + sqlx::query( + "CREATE TABLE stake_undelegation_txs ( + id SERIAL PRIMARY KEY, + validator_ik BYTEA NOT NULL, + amount BIGINT NOT NULL, + height BIGINT NOT NULL, + tx_hash BYTEA NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create index on validator_ik + sqlx::query("CREATE INDEX idx_stake_undelegation_txs_validator_ik ON stake_undelegation_txs(validator_ik);") + .execute(dbtx.as_mut()) + .await?; + + // Create descending index on height + sqlx::query( + "CREATE INDEX idx_stake_undelegation_txs_height ON stake_undelegation_txs(height DESC);", + ) + .execute(dbtx.as_mut()) + .await?; + + // Create composite index on validator_ik and height (descending) + sqlx::query("CREATE INDEX idx_stake_undelegation_txs_validator_ik_height ON stake_undelegation_txs(validator_ik, height DESC);") + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + type_str == "penumbra.core.component.stake.v1.EventUndelegate" + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + ) -> Result<()> { + let pe = pb::EventUndelegate::from_event(event.as_ref())?; + + let ik_bytes = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .ik; + + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; + + sqlx::query( + "INSERT INTO stake_undelegation_txs (validator_ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" + ) + .bind(&ik_bytes) + .bind(amount.value() as i64) + .bind(event.block_height as i64) + .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs new file mode 100644 index 0000000000..8cd0e18bd4 --- /dev/null +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -0,0 +1,370 @@ +use std::collections::BTreeMap; + +use anyhow::{anyhow, Context, Result}; +use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; + +use penumbra_app::genesis::AppState; +use penumbra_asset::asset; +use penumbra_num::Amount; +use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; +use penumbra_stake::{ + validator::{self, Validator}, + IdentityKey, +}; + +#[derive(Debug)] +pub struct ValidatorSet {} + +#[async_trait] +impl AppView for ValidatorSet { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + sqlx::query( + // table name is module path + struct name + // note: protobuf data is encoded as protojson for ease of consumers + // hence TEXT fields + "CREATE TABLE stake_validator_set ( + id SERIAL PRIMARY KEY, + ik BYTEA NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL, + voting_power BIGINT NOT NULL, + queued_delegations BIGINT NOT NULL, + queued_undelegations BIGINT NOT NULL, + validator_state TEXT NOT NULL, + bonding_state TEXT NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);") + .execute(dbtx.as_mut()) + .await?; + + let app_state: penumbra_app::genesis::AppState = + serde_json::from_value(app_state.clone()).context("error decoding app_state json")?; + + add_genesis_validators(dbtx, &app_state).await?; + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + match type_str { + "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => true, + "penumbra.core.component.stake.v1.EventDelegate" => true, + "penumbra.core.component.stake.v1.EventUndelegate" => true, + "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => true, + "penumbra.core.component.stake.v1.EventValidatorStateChange" => true, + "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => true, + _ => false, + } + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + match event.event.kind.as_str() { + "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { + let pe = pb::EventValidatorDefinitionUpload::from_event(event.as_ref())?; + let val = Validator::try_from( + pe.validator + .ok_or_else(|| anyhow!("missing validator in event"))?, + )?; + + handle_upload(dbtx, val).await?; + } + "penumbra.core.component.stake.v1.EventDelegate" => { + let pe = pb::EventDelegate::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow!("missing amount in event"))?, + )?; + + handle_delegate(dbtx, ik, amount).await?; + } + "penumbra.core.component.stake.v1.EventUndelegate" => { + let pe = pb::EventUndelegate::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow!("missing amount in event"))?, + )?; + handle_undelegate(dbtx, ik, amount).await?; + } + "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => { + let pe = pb::EventValidatorVotingPowerChange::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + let voting_power = Amount::try_from( + pe.voting_power + .ok_or_else(|| anyhow!("missing amount in event"))?, + )?; + handle_voting_power_change(dbtx, ik, voting_power).await?; + } + "penumbra.core.component.stake.v1.EventValidatorStateChange" => { + let pe = pb::EventValidatorStateChange::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + let state = validator::State::try_from( + pe.state.ok_or_else(|| anyhow!("missing state in event"))?, + )?; + handle_validator_state_change(dbtx, ik, state).await?; + } + "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => { + let pe = pb::EventValidatorBondingStateChange::from_event(event.as_ref())?; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + let bonding_state = validator::BondingState::try_from( + pe.bonding_state + .ok_or_else(|| anyhow!("missing bonding_state in event"))?, + )?; + handle_validator_bonding_state_change(dbtx, ik, bonding_state).await?; + } + _ => {} + } + + Ok(()) + } +} + +async fn add_genesis_validators<'a>( + dbtx: &mut PgTransaction<'a>, + app_state: &AppState, +) -> Result<()> { + let content = app_state + .content() + .ok_or_else(|| anyhow::anyhow!("cannot initialize indexer from checkpoint genesis"))?; + + // Given a genesis validator, we need to figure out its delegations at + // genesis by getting its delegation token then summing up all the allocations. + // Build up a table of the total allocations first. + let mut allos = BTreeMap::::new(); + for allo in &content.shielded_pool_content.allocations { + let value = allo.value(); + let sum = allos.entry(value.asset_id).or_default(); + *sum = sum + .checked_add(&value.amount) + .ok_or_else(|| anyhow::anyhow!("overflow adding genesis allos (should not happen)"))?; + } + + for val in &content.stake_content.validators { + // FIXME: this shouldn't be a proto type but now that has been propagated + // all through the rest of the code for no reason + let val = Validator::try_from(val.clone())?; + let delegation_amount = allos.get(&val.token().id()).cloned().unwrap_or_default(); + + // insert sql + sqlx::query( + "INSERT INTO stake_validator_set ( + ik, name, definition, voting_power, queued_delegations, + queued_undelegations, validator_state, bonding_state + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + ) + .bind(val.identity_key.to_bytes()) + .bind(val.name.clone()) + .bind(serde_json::to_string(&val).expect("can serialize")) + .bind(delegation_amount.value() as i64) + .bind(0) // queued_delegations + .bind(0) // queued_undelegations + .bind(serde_json::to_string(&validator::State::Active).unwrap()) // see add_genesis_validator + .bind(serde_json::to_string(&validator::BondingState::Bonded).unwrap()) // see add_genesis_validator + .execute(dbtx.as_mut()) + .await?; + } + + Ok(()) +} + +async fn handle_upload<'a>(dbtx: &mut PgTransaction<'a>, val: Validator) -> Result<()> { + // First, check if the validator already exists + let exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM stake_validator_set WHERE ik = $1)") + .bind(&val.identity_key.to_bytes()) + .fetch_one(dbtx.as_mut()) + .await?; + + if exists { + // Update existing validator, leaving all the other data like state, VP etc unchanged + sqlx::query( + "UPDATE stake_validator_set SET + name = $2, + definition = $3 + WHERE ik = $1", + ) + .bind(val.identity_key.to_bytes()) + .bind(val.name.clone()) + .bind(serde_json::to_string(&val).expect("can serialize")) + .execute(dbtx.as_mut()) + .await?; + } else { + // Insert new validator + sqlx::query( + "INSERT INTO stake_validator_set ( + ik, name, definition, voting_power, queued_delegations, + queued_undelegations, validator_state, bonding_state + ) + VALUES ($1, $2, $3, 0, 0, 0, $4, $5)", + ) + .bind(val.identity_key.to_bytes()) + .bind(val.name.clone()) + .bind(serde_json::to_string(&val).expect("can serialize")) + .bind(serde_json::to_string(&validator::State::Defined).expect("can serialize")) // ValidatorManager::add_validator + .bind(serde_json::to_string(&validator::BondingState::Unbonded).expect("can serialize")) // ValidatorManager::add_validator + .execute(dbtx.as_mut()) + .await?; + } + + Ok(()) +} + +async fn handle_delegate<'a>( + dbtx: &mut PgTransaction<'a>, + ik: IdentityKey, + amount: Amount, +) -> Result<()> { + // Update the validator's voting power and queued delegations + let rows_affected = sqlx::query( + "UPDATE stake_validator_set + SET + queued_delegations = queued_delegations + $2 + WHERE ik = $1", + ) + .bind(ik.to_bytes()) + .bind(amount.value() as i64) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + // Check if the update was successful + if rows_affected == 0 { + anyhow::bail!("No validator found with the given identity key"); + } + + Ok(()) +} + +async fn handle_undelegate<'a>( + dbtx: &mut PgTransaction<'a>, + ik: IdentityKey, + amount: Amount, +) -> Result<()> { + // Update only the queued undelegations + let rows_affected = sqlx::query( + "UPDATE stake_validator_set + SET + queued_undelegations = queued_undelegations + $2 + WHERE ik = $1", + ) + .bind(ik.to_bytes()) + .bind(amount.value() as i64) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + // Check if the update was successful + if rows_affected == 0 { + anyhow::bail!("No validator found with the given identity key"); + } + + Ok(()) +} + +async fn handle_voting_power_change<'a>( + dbtx: &mut PgTransaction<'a>, + ik: IdentityKey, + voting_power: Amount, +) -> Result<()> { + // Update the validator's voting power and reset queued delegations/undelegations + let rows_affected = sqlx::query( + "UPDATE stake_validator_set + SET + voting_power = $2, + queued_delegations = 0, + queued_undelegations = 0 + WHERE ik = $1", + ) + .bind(ik.to_bytes()) + .bind(voting_power.value() as i64) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + // Check if the update was successful + if rows_affected == 0 { + anyhow::bail!("No validator found with the given identity key"); + } + + Ok(()) +} + +async fn handle_validator_state_change<'a>( + dbtx: &mut PgTransaction<'a>, + ik: IdentityKey, + state: validator::State, +) -> Result<()> { + // Update the validator's state + let rows_affected = sqlx::query( + "UPDATE stake_validator_set + SET + validator_state = $2 + WHERE ik = $1", + ) + .bind(ik.to_bytes()) + .bind(serde_json::to_string(&state).expect("can serialize")) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + // Check if the update was successful + if rows_affected == 0 { + anyhow::bail!("No validator found with the given identity key"); + } + + Ok(()) +} + +async fn handle_validator_bonding_state_change<'a>( + dbtx: &mut PgTransaction<'a>, + ik: IdentityKey, + bonding_state: validator::BondingState, +) -> Result<()> { + // Update the validator's bonding state + let rows_affected = sqlx::query( + "UPDATE stake_validator_set + SET + bonding_state = $2 + WHERE ik = $1", + ) + .bind(ik.to_bytes()) + .bind(serde_json::to_string(&bonding_state).expect("can serialize")) + .execute(dbtx.as_mut()) + .await? + .rows_affected(); + + // Check if the update was successful + if rows_affected == 0 { + anyhow::bail!("No validator found with the given identity key"); + } + + Ok(()) +} diff --git a/crates/core/app/src/genesis.rs b/crates/core/app/src/genesis.rs index 8b07b24b68..becf38cb84 100644 --- a/crates/core/app/src/genesis.rs +++ b/crates/core/app/src/genesis.rs @@ -23,6 +23,15 @@ pub enum AppState { Checkpoint(Vec), } +impl AppState { + pub fn content(&self) -> Option<&Content> { + match self { + AppState::Content(content) => Some(content), + _ => None, + } + } +} + #[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(try_from = "pb::GenesisContent", into = "pb::GenesisContent")] pub struct Content { diff --git a/crates/core/component/stake/src/identity_key.rs b/crates/core/component/stake/src/identity_key.rs index 3ffc9e19b9..98c85eb986 100644 --- a/crates/core/component/stake/src/identity_key.rs +++ b/crates/core/component/stake/src/identity_key.rs @@ -22,6 +22,12 @@ use decaf377_rdsa::{SpendAuth, VerificationKeyBytes}; #[serde(try_from = "pb::IdentityKey", into = "pb::IdentityKey")] pub struct IdentityKey(pub VerificationKeyBytes); +impl IdentityKey { + pub fn to_bytes(&self) -> [u8; 32] { + self.0.into() + } +} + // IMPORTANT: Changing this implementation is state-breaking. impl std::str::FromStr for IdentityKey { type Err = anyhow::Error; diff --git a/crates/core/component/stake/src/validator.rs b/crates/core/component/stake/src/validator.rs index 858b182a44..734506aa5a 100644 --- a/crates/core/component/stake/src/validator.rs +++ b/crates/core/component/stake/src/validator.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use serde_unit_struct::{Deserialize_unit_struct, Serialize_unit_struct}; use serde_with::{serde_as, DisplayFromStr}; -use crate::{FundingStream, FundingStreams, GovernanceKey, IdentityKey}; +use crate::{DelegationToken, FundingStream, FundingStreams, GovernanceKey, IdentityKey}; mod bonding; mod definition; @@ -69,6 +69,12 @@ pub struct Validator { pub sequence_number: u32, } +impl Validator { + pub fn token(&self) -> DelegationToken { + DelegationToken::new(self.identity_key.clone()) + } +} + #[serde_as] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct ValidatorToml { diff --git a/crates/util/cometindex/examples/fmd_clues.rs b/crates/util/cometindex/examples/fmd_clues.rs index f7bae62ed4..26b529f85e 100644 --- a/crates/util/cometindex/examples/fmd_clues.rs +++ b/crates/util/cometindex/examples/fmd_clues.rs @@ -12,7 +12,11 @@ struct FmdCluesExample {} #[async_trait] impl AppView for FmdCluesExample { - async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { sqlx::query( " CREATE TABLE IF NOT EXISTS fmd_clues_example ( diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index d3348c17ad..7e629cbda3 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -8,7 +8,11 @@ pub type PgTransaction<'a> = Transaction<'a, Postgres>; /// Represents a specific index of raw event data. #[async_trait] pub trait AppView: std::fmt::Debug { - async fn init_chain(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error>; + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error>; fn is_relevant(&self, type_str: &str) -> bool; diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index a1d229d707..e620838dd3 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use anyhow::Result; +use anyhow::{Context as _, Result}; use clap::Parser; use futures::{Stream, StreamExt, TryStreamExt}; use sqlx::PgPool; @@ -33,10 +33,14 @@ impl Indexer { self } - async fn create_dst_tables(pool: &PgPool, indexes: &[Box]) -> Result<()> { + async fn create_dst_tables( + pool: &PgPool, + indexes: &[Box], + app_state: &serde_json::Value, + ) -> Result<()> { let mut dbtx = pool.begin().await?; for index in indexes { - index.init_chain(&mut dbtx).await?; + index.init_chain(&mut dbtx, app_state).await?; } dbtx.commit().await?; Ok(()) @@ -51,6 +55,7 @@ impl Indexer { dst_database_url, chain_id: _, poll_ms, + genesis_json, }, indexes, } = self; @@ -58,12 +63,38 @@ impl Indexer { let src_db = PgPool::connect(&src_database_url).await?; let dst_db = PgPool::connect(&dst_database_url).await?; - Self::create_dst_tables(&dst_db, &indexes).await?; + // Check if the destination db is initialized + let dst_db_initialized: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'index_watermark' + )", + ) + .fetch_one(&dst_db) + .await?; + + if !dst_db_initialized { + tracing::info!("no watermark found, initializing with genesis data"); + + // Create the table if it doesn't exist + sqlx::query("CREATE TABLE index_watermark (events_rowid BIGINT NOT NULL)") + .execute(&dst_db) + .await?; - // Create the index_watermark table if it does not exist - sqlx::query("CREATE TABLE IF NOT EXISTS index_watermark (events_rowid BIGINT NOT NULL)") - .execute(&dst_db) - .await?; + // Load the genesis JSON to be used populating initial tables + let genesis_content: serde_json::Value = serde_json::from_str( + &std::fs::read_to_string(genesis_json) + .context("error reading provided genesis.json file")?, + ) + .context("error parsing provided genesis.json file")?; + let app_state = genesis_content + .get("app_state") + .ok_or_else(|| anyhow::anyhow!("no app_state key in genesis.json"))?; + + Self::create_dst_tables(&dst_db, &indexes, app_state).await?; + } else { + tracing::info!("skipping genesis initialization"); + } loop { Self::tick(&src_db, &dst_db, &indexes).await?; diff --git a/crates/util/cometindex/src/opt.rs b/crates/util/cometindex/src/opt.rs index 2331c2d972..638c9c3a47 100644 --- a/crates/util/cometindex/src/opt.rs +++ b/crates/util/cometindex/src/opt.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{path::PathBuf, time::Duration}; use anyhow::{Error, Result}; use clap::Parser; @@ -26,6 +26,10 @@ pub struct Options { /// The rate at which to poll for changes, in milliseconds. #[clap(short, long, default_value = "500", value_parser = parse_poll_ms)] pub poll_ms: Duration, + + /// A file path for the genesis file to use when initializing the indexer. + #[clap(short, long)] + pub genesis_json: PathBuf, } /// Parses a string containing a [`Duration`], represented as a number of milliseconds.