diff --git a/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/down.sql b/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/down.sql new file mode 100644 index 0000000000000..f484811e243c0 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS kv_epoch_starts; +DROP TABLE IF EXISTS kv_epoch_ends; diff --git a/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/up.sql b/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/up.sql new file mode 100644 index 0000000000000..1c9a3706fcb6f --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-02-160143_kv_epochs/up.sql @@ -0,0 +1,56 @@ +-- Information related to an epoch that is available when it starts +CREATE TABLE IF NOT EXISTS kv_epoch_starts +( + epoch BIGINT PRIMARY KEY, + protocol_version BIGINT NOT NULL, + + -- Inclusive checkpoint lowerbound of the epoch. + cp_lo BIGINT NOT NULL, + -- The timestamp that the epoch starts at. This is always extracted from + -- the system state object. + start_timestamp_ms BIGINT NOT NULL, + -- The reference gas price that will be used for the rest of the epoch. + reference_gas_price BIGINT NOT NULL, + -- BCS serialized SystemState. + system_state BYTEA NOT NULL +); + +-- Information related to an epoch that is available when it ends (after the +-- epoch advancement to the next epoch) +CREATE TABLE IF NOT EXISTS kv_epoch_ends +( + epoch BIGINT PRIMARY KEY, + + -- Exclusive checkpoint upperbound of the epoch. + cp_hi BIGINT NOT NULL, + -- Exclusive transaction upperbound of the epoch. + tx_hi BIGINT NOT NULL, + + -- The epoch ends at the timestamp of its last checkpoint. + end_timestamp_ms BIGINT NOT NULL, + + -- Whether the epoch advancement at the end of this epoch entered safe + -- mode. + safe_mode BOOLEAN NOT NULL, + + -- Staking information after advancement to the next epoch. These fields + -- are extracted from the `SystemEpochInfoEvent` emitted during epoch + -- advancement. If the epoch advancement entered safe mode, these fields + -- will all be NULL (because a safe mode advance epoch does not emit this + -- event). + total_stake BIGINT, + storage_fund_balance BIGINT, + storage_fund_reinvestment BIGINT, + storage_charge BIGINT, + storage_rebate BIGINT, + stake_subsidy_amount BIGINT, + total_gas_fees BIGINT, + total_stake_rewards_distributed + BIGINT, + leftover_storage_fund_inflow + BIGINT, + + -- BCS serialized `Vec` bytes, found in last + -- `CheckpointSummary` of the epoch. + epoch_commitments BYTEA NOT NULL +); diff --git a/crates/sui-indexer-alt/src/bootstrap.rs b/crates/sui-indexer-alt/src/bootstrap.rs index 5b80a8fb8fc52..6d5fe26338b39 100644 --- a/crates/sui-indexer-alt/src/bootstrap.rs +++ b/crates/sui-indexer-alt/src/bootstrap.rs @@ -15,7 +15,10 @@ use tokio_util::sync::CancellationToken; use tracing::info; use crate::{ - models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer, + models::{checkpoints::StoredGenesis, epochs::StoredEpochStart}, + schema::{kv_epoch_starts, kv_genesis}, + task::graceful_shutdown, + Indexer, }; /// Ensures the genesis table has been populated before the rest of the indexer is run, and returns @@ -91,6 +94,15 @@ pub async fn bootstrap( initial_protocol_version: system_state.protocol_version() as i64, }; + let epoch_start = StoredEpochStart { + epoch: 0, + protocol_version: system_state.protocol_version() as i64, + cp_lo: 0, + start_timestamp_ms: system_state.epoch_start_timestamp_ms() as i64, + reference_gas_price: system_state.reference_gas_price() as i64, + system_state: bcs::to_bytes(&system_state).context("Failed to serialize SystemState")?, + }; + info!( chain = genesis.chain()?.as_str(), protocol = ?genesis.initial_protocol_version(), @@ -104,5 +116,12 @@ pub async fn bootstrap( .await .context("Failed to write genesis record")?; + diesel::insert_into(kv_epoch_starts::table) + .values(&epoch_start) + .on_conflict_do_nothing() + .execute(&mut conn) + .await + .context("Failed to write genesis epoch start record")?; + Ok(genesis) } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs new file mode 100644 index 0000000000000..fa4fe6f024a6c --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -0,0 +1,132 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use anyhow::{bail, Context, Result}; +use diesel_async::RunQueryDsl; +use sui_types::{ + event::SystemEpochInfoEvent, + full_checkpoint_content::CheckpointData, + transaction::{TransactionDataAPI, TransactionKind}, +}; + +use crate::{ + db, + models::epochs::StoredEpochEnd, + pipeline::{concurrent::Handler, Processor}, + schema::kv_epoch_ends, +}; + +pub struct KvEpochEnds; + +impl Processor for KvEpochEnds { + const NAME: &'static str = "kv_epoch_ends"; + + type Value = StoredEpochEnd; + + fn process(&self, checkpoint: &Arc) -> Result> { + let CheckpointData { + checkpoint_summary, + transactions, + .. + } = checkpoint.as_ref(); + + let Some(end_of_epoch) = checkpoint_summary.end_of_epoch_data.as_ref() else { + return Ok(vec![]); + }; + + let Some(transaction) = transactions.iter().find(|tx| { + matches!( + tx.transaction.intent_message().value.kind(), + TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_) + ) + }) else { + bail!( + "Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData", + checkpoint_summary.sequence_number, + ); + }; + + if let Some(SystemEpochInfoEvent { + total_stake, + storage_fund_reinvestment, + storage_charge, + storage_rebate, + storage_fund_balance, + stake_subsidy_amount, + total_gas_fees, + total_stake_rewards_distributed, + leftover_storage_fund_inflow, + .. + }) = transaction + .events + .iter() + .flat_map(|events| &events.data) + .find_map(|event| { + event + .is_system_epoch_info_event() + .then(|| bcs::from_bytes(&event.contents)) + }) + .transpose() + .context("Failed to deserialize SystemEpochInfoEvent")? + { + Ok(vec![StoredEpochEnd { + epoch: checkpoint_summary.epoch as i64, + cp_hi: checkpoint_summary.sequence_number as i64 + 1, + tx_hi: checkpoint_summary.network_total_transactions as i64, + end_timestamp_ms: checkpoint_summary.timestamp_ms as i64, + + safe_mode: false, + + total_stake: Some(total_stake as i64), + storage_fund_balance: Some(storage_fund_balance as i64), + storage_fund_reinvestment: Some(storage_fund_reinvestment as i64), + storage_charge: Some(storage_charge as i64), + storage_rebate: Some(storage_rebate as i64), + stake_subsidy_amount: Some(stake_subsidy_amount as i64), + total_gas_fees: Some(total_gas_fees as i64), + total_stake_rewards_distributed: Some(total_stake_rewards_distributed as i64), + leftover_storage_fund_inflow: Some(leftover_storage_fund_inflow as i64), + + epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments) + .context("Failed to serialize EpochCommitment-s")?, + }]) + } else { + Ok(vec![StoredEpochEnd { + epoch: checkpoint_summary.epoch as i64, + cp_hi: checkpoint_summary.sequence_number as i64 + 1, + tx_hi: checkpoint_summary.network_total_transactions as i64, + end_timestamp_ms: checkpoint_summary.timestamp_ms as i64, + + safe_mode: true, + + total_stake: None, + storage_fund_balance: None, + storage_fund_reinvestment: None, + storage_charge: None, + storage_rebate: None, + stake_subsidy_amount: None, + total_gas_fees: None, + total_stake_rewards_distributed: None, + leftover_storage_fund_inflow: None, + + epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments) + .context("Failed to serialize EpochCommitment-s")?, + }]) + } + } +} + +#[async_trait::async_trait] +impl Handler for KvEpochEnds { + const MIN_EAGER_ROWS: usize = 1; + + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + Ok(diesel::insert_into(kv_epoch_ends::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs new file mode 100644 index 0000000000000..d4ac53ebebaf4 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -0,0 +1,77 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use anyhow::{bail, Context, Result}; +use diesel_async::RunQueryDsl; +use sui_types::{ + full_checkpoint_content::CheckpointData, + sui_system_state::{get_sui_system_state, SuiSystemStateTrait}, + transaction::{TransactionDataAPI, TransactionKind}, +}; + +use crate::{ + db, models::epochs::StoredEpochStart, pipeline::concurrent::Handler, pipeline::Processor, + schema::kv_epoch_starts, +}; + +pub struct KvEpochStarts; + +impl Processor for KvEpochStarts { + const NAME: &'static str = "kv_epoch_starts"; + + type Value = StoredEpochStart; + + fn process(&self, checkpoint: &Arc) -> Result> { + let CheckpointData { + checkpoint_summary, + transactions, + .. + } = checkpoint.as_ref(); + + // If this is the last checkpoint in the current epoch, it will contain enough information + // about the start of the next epoch. + if !checkpoint_summary.is_last_checkpoint_of_epoch() { + return Ok(vec![]); + } + + let Some(transaction) = transactions.iter().find(|tx| { + matches!( + tx.transaction.intent_message().value.kind(), + TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_) + ) + }) else { + bail!( + "Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData", + checkpoint_summary.sequence_number, + ); + }; + + let system_state = get_sui_system_state(&transaction.output_objects.as_slice()) + .context("Failed to find system state object output from end of epoch transaction")?; + + Ok(vec![StoredEpochStart { + epoch: system_state.epoch() as i64, + protocol_version: system_state.protocol_version() as i64, + cp_lo: checkpoint_summary.sequence_number as i64 + 1, + start_timestamp_ms: system_state.epoch_start_timestamp_ms() as i64, + reference_gas_price: system_state.reference_gas_price() as i64, + system_state: bcs::to_bytes(&system_state) + .context("Failed to serialize SystemState")?, + }]) + } +} + +#[async_trait::async_trait] +impl Handler for KvEpochStarts { + const MIN_EAGER_ROWS: usize = 1; + + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + Ok(diesel::insert_into(kv_epoch_starts::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index f3424c512c187..405308e5107d6 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -4,6 +4,8 @@ pub mod ev_emit_mod; pub mod ev_struct_inst; pub mod kv_checkpoints; +pub mod kv_epoch_ends; +pub mod kv_epoch_starts; pub mod kv_feature_flags; pub mod kv_objects; pub mod kv_protocol_configs; diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 73f4986729f8e..5ec607054177b 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -6,6 +6,8 @@ use clap::Parser; use sui_indexer_alt::args::Command; use sui_indexer_alt::bootstrap::bootstrap; use sui_indexer_alt::db::reset_database; +use sui_indexer_alt::handlers::kv_epoch_ends::KvEpochEnds; +use sui_indexer_alt::handlers::kv_epoch_starts::KvEpochStarts; use sui_indexer_alt::handlers::kv_feature_flags::KvFeatureFlags; use sui_indexer_alt::handlers::kv_protocol_configs::KvProtocolConfigs; use sui_indexer_alt::{ @@ -48,6 +50,8 @@ async fn main() -> Result<()> { indexer.concurrent_pipeline(EvEmitMod).await?; indexer.concurrent_pipeline(EvStructInst).await?; indexer.concurrent_pipeline(KvCheckpoints).await?; + indexer.concurrent_pipeline(KvEpochEnds).await?; + indexer.concurrent_pipeline(KvEpochStarts).await?; indexer.concurrent_pipeline(kv_feature_flags).await?; indexer.concurrent_pipeline(KvObjects).await?; indexer.concurrent_pipeline(kv_protocol_configs).await?; diff --git a/crates/sui-indexer-alt/src/models/epochs.rs b/crates/sui-indexer-alt/src/models/epochs.rs index 69212a54ec078..a07a4676e3243 100644 --- a/crates/sui-indexer-alt/src/models/epochs.rs +++ b/crates/sui-indexer-alt/src/models/epochs.rs @@ -4,7 +4,38 @@ use diesel::prelude::*; use sui_field_count::FieldCount; -use crate::schema::{kv_feature_flags, kv_protocol_configs}; +use crate::schema::{kv_epoch_ends, kv_epoch_starts, kv_feature_flags, kv_protocol_configs}; + +#[derive(Insertable, Debug, Clone, FieldCount)] +#[diesel(table_name = kv_epoch_ends)] +pub struct StoredEpochEnd { + pub epoch: i64, + pub cp_hi: i64, + pub tx_hi: i64, + pub end_timestamp_ms: i64, + pub safe_mode: bool, + pub total_stake: Option, + pub storage_fund_balance: Option, + pub storage_fund_reinvestment: Option, + pub storage_charge: Option, + pub storage_rebate: Option, + pub stake_subsidy_amount: Option, + pub total_gas_fees: Option, + pub total_stake_rewards_distributed: Option, + pub leftover_storage_fund_inflow: Option, + pub epoch_commitments: Vec, +} + +#[derive(Insertable, Debug, Clone, FieldCount)] +#[diesel(table_name = kv_epoch_starts)] +pub struct StoredEpochStart { + pub epoch: i64, + pub protocol_version: i64, + pub cp_lo: i64, + pub start_timestamp_ms: i64, + pub reference_gas_price: i64, + pub system_state: Vec, +} #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = kv_feature_flags)] diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index 721faf2599ad3..de37e5c137b9e 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -30,6 +30,37 @@ diesel::table! { } } +diesel::table! { + kv_epoch_ends (epoch) { + epoch -> Int8, + cp_hi -> Int8, + tx_hi -> Int8, + end_timestamp_ms -> Int8, + safe_mode -> Bool, + total_stake -> Nullable, + storage_fund_balance -> Nullable, + storage_fund_reinvestment -> Nullable, + storage_charge -> Nullable, + storage_rebate -> Nullable, + stake_subsidy_amount -> Nullable, + total_gas_fees -> Nullable, + total_stake_rewards_distributed -> Nullable, + leftover_storage_fund_inflow -> Nullable, + epoch_commitments -> Bytea, + } +} + +diesel::table! { + kv_epoch_starts (epoch) { + epoch -> Int8, + protocol_version -> Int8, + cp_lo -> Int8, + start_timestamp_ms -> Int8, + reference_gas_price -> Int8, + system_state -> Bytea, + } +} + diesel::table! { kv_feature_flags (protocol_version, flag_name) { protocol_version -> Int8, @@ -181,6 +212,8 @@ diesel::allow_tables_to_appear_in_same_query!( ev_emit_mod, ev_struct_inst, kv_checkpoints, + kv_epoch_ends, + kv_epoch_starts, kv_feature_flags, kv_genesis, kv_objects,