From 0eee79c59d4abd1edd59d78a6831915c22d4cfb8 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Fri, 19 Apr 2024 18:16:10 +0100 Subject: [PATCH] Add documentation for public APIs --- chaindexing/src/config.rs | 18 ++++++- chaindexing/src/contracts.rs | 17 ++++-- chaindexing/src/events/event.rs | 2 + chaindexing/src/handlers/pure_handler.rs | 2 + .../src/handlers/side_effect_handler.rs | 12 ++--- chaindexing/src/lib.rs | 20 ++++++- chaindexing/src/states/chain_state.rs | 7 +++ chaindexing/src/states/contract_state.rs | 7 +++ chaindexing/src/states/filters.rs | 54 ++++++++++++++++++- chaindexing/src/states/migrations.rs | 12 +++++ chaindexing/src/states/multi_chain_state.rs | 7 +++ chaindexing/src/states/updates.rs | 42 +++++++++++++++ 12 files changed, 187 insertions(+), 13 deletions(-) diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index 5343445..ba8da56 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -45,6 +45,7 @@ impl OptimizationConfig { } } +/// Configuration for indexing states #[derive(Clone, Debug)] pub struct Config { pub chains: Vec, @@ -87,12 +88,14 @@ impl Config { } } + // Includes chain in config pub fn add_chain(mut self, chain: Chain) -> Self { self.chains.push(chain); self } + // Includes contract in config pub fn add_contract(mut self, contract: Contract) -> Self { self.contracts.push(contract); @@ -106,49 +109,61 @@ impl Config { self } + /// Restarts indexing from scratch for EventHandlers. SideEffectHandlers + /// will not run if they ran already pub fn reset(mut self, count: u64) -> Self { self.reset_count = count; self } + /// Restarts indexing from scratch for all Handlers. SideEffectHandlers + /// will RUN even if they ran already pub fn reset_including_side_effects_dangerously(mut self, count: u64) -> Self { self.reset_including_side_effects_count = count; self } + /// Defines the initial state for side effect handlers pub fn with_initial_state(mut self, initial_state: SharedState) -> Self { self.shared_state = Some(Arc::new(Mutex::new(initial_state))); self } + /// The minimum confirmation count for detecting chain-reorganizations or uncled blocks pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self { self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count); self } + /// Advance config: How many blocks per batch should be ingested and handled. + /// Default is 8_000 pub fn with_blocks_per_batch(mut self, blocks_per_batch: u64) -> Self { self.blocks_per_batch = blocks_per_batch; self } + /// Advance config: How often should the events handlers processes run. + /// Default is 4_000 pub fn with_handler_rate_ms(mut self, handler_rate_ms: u64) -> Self { self.handler_rate_ms = handler_rate_ms; self } + /// Advance config: How often should the events ingester processes run. + /// Default is 20_000 pub fn with_ingestion_rate_ms(mut self, ingestion_rate_ms: u64) -> Self { self.ingestion_rate_ms = ingestion_rate_ms; self } - // Configures number of chain batches to be processed concurrently + /// Configures number of chain batches to be processed concurrently pub fn with_chain_concurrency(mut self, chain_concurrency: u32) -> Self { self.chain_concurrency = chain_concurrency; @@ -167,6 +182,7 @@ impl Config { self } + /// Deletes stale events and related-internal data pub fn with_pruning(mut self) -> Self { self.pruning_config = Some(Default::default()); diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index 40fec8d..089573d 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -34,6 +34,7 @@ impl ContractEvent { pub type EventAbi = &'static str; +/// Represents the specification for a given contract. #[derive(Clone)] pub struct Contract { pub addresses: Vec, @@ -44,6 +45,12 @@ pub struct Contract { } impl Contract { + /// Builds the specification for a contract + /// + /// # Example + /// ``` + /// Contract::new("ERC20") + /// ``` pub fn new(name: &str) -> Self { Self { addresses: vec![], @@ -54,6 +61,7 @@ impl Contract { } } + /// Adds a contract address to a contract pub fn add_address( mut self, address: &str, @@ -70,12 +78,14 @@ impl Contract { self } + /// Adds an event handler pub fn add_event_handler(mut self, handler: impl EventHandler + 'static) -> Self { self.pure_handlers.insert(handler.abi(), Arc::new(handler)); self } + /// Adds a side-effect handler pub fn add_side_effect_handler( mut self, handler: impl SideEffectHandler + 'static, @@ -85,13 +95,14 @@ impl Contract { self } + /// Adds state migrations for the contract states being indexed pub fn add_state_migrations(mut self, state_migration: impl StateMigrations + 'static) -> Self { self.state_migrations.push(Arc::new(state_migration)); self } - pub fn get_event_abis(&self) -> Vec { + pub(crate) fn get_event_abis(&self) -> Vec { let mut event_abis: Vec<_> = self.pure_handlers.clone().into_keys().collect(); let side_effect_abis: Vec<_> = self.pure_handlers.clone().into_keys().collect(); @@ -101,14 +112,14 @@ impl Contract { event_abis } - pub fn get_event_topics(&self) -> Vec { + pub(crate) fn get_event_topics(&self) -> Vec { self.get_event_abis() .iter() .map(|abi| HumanReadableParser::parse_event(abi).unwrap().signature()) .collect() } - pub fn build_events(&self) -> Vec { + pub(crate) fn build_events(&self) -> Vec { self.get_event_abis().iter().map(|abi| ContractEvent::new(abi)).collect() } } diff --git a/chaindexing/src/events/event.rs b/chaindexing/src/events/event.rs index e4031b6..9d1e49a 100644 --- a/chaindexing/src/events/event.rs +++ b/chaindexing/src/events/event.rs @@ -12,6 +12,8 @@ use uuid::Uuid; use serde::Deserialize; +/// An event aka provider logs are emitted from smart contracts +/// to help infer their state #[derive(Debug, Deserialize, Clone, Eq, Queryable, Insertable)] #[diesel(table_name = chaindexing_events)] pub struct Event { diff --git a/chaindexing/src/handlers/pure_handler.rs b/chaindexing/src/handlers/pure_handler.rs index 95651fc..1aa65d6 100644 --- a/chaindexing/src/handlers/pure_handler.rs +++ b/chaindexing/src/handlers/pure_handler.rs @@ -8,6 +8,8 @@ use crate::{ChaindexingRepoClient, ChaindexingRepoTxnClient, EventParam}; use super::handler_context::HandlerContext; +/// Pure handlers do not contain any side effects. They are simple reducers +/// that derive or index states deterministically. #[async_trait::async_trait] pub trait PureHandler: Send + Sync { /// The human-readable ABI of the event being handled. diff --git a/chaindexing/src/handlers/side_effect_handler.rs b/chaindexing/src/handlers/side_effect_handler.rs index c40dbb0..a7d2290 100644 --- a/chaindexing/src/handlers/side_effect_handler.rs +++ b/chaindexing/src/handlers/side_effect_handler.rs @@ -10,6 +10,12 @@ use super::handler_context::HandlerContext; #[async_trait::async_trait] +/// SideEffectHandlers are event handlers that help handle side-effects for events. +/// This is useful for handling events only ONCE and can rely on a non-deterministic +/// shared state. Some use-cases are notifications, bridging etc. Chaindexing ensures +/// that the side-effect handlers are called once immutable regardless of resets. +/// However, one can dangerously reset including side effects with the new `reset_including_side_effects` +/// Config API. pub trait SideEffectHandler: Send + Sync { type SharedState: Send + Sync + Clone + Debug; @@ -21,12 +27,6 @@ pub trait SideEffectHandler: Send + Sync { async fn handle_event<'a>(&self, context: SideEffectHandlerContext<'a, Self::SharedState>); } -// SideEffectHandlers are event handlers that help handle side-effects for events. -// This is useful for handling events only ONCE and can rely on a non-deterministic -// shared state. Some use-cases are notifications, bridging etc. Chaindexing ensures -// that the side-effect handlers are called once immutable regardless of resets. -// However, one can dangerously reset including side effects with the new `reset_including_side_effects` -// Config API. #[derive(Clone)] pub struct SideEffectHandlerContext<'a, SharedState: Sync + Send + Clone> { pub event: Event, diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 4bb1d7c..37b7015 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -62,7 +62,7 @@ use nodes::NodeTasks; use crate::nodes::{NodeTask, NodeTasksRunner}; -pub type ChaindexingRepoClientMutex = Arc>; +pub(crate) type ChaindexingRepoClientMutex = Arc>; pub enum ChaindexingError { Config(ConfigError), @@ -84,6 +84,7 @@ impl Debug for ChaindexingError { } } +/// Starts processes to ingest and index states as configured pub async fn index_states( config: &Config, ) -> Result<(), ChaindexingError> { @@ -128,6 +129,23 @@ pub async fn index_states( Ok(()) } +/// Includes runtime-discovered contract addresses for indexing +/// +/// # Arguments +/// +/// * `event_context` - context where the contract was discovered. Indexing starts +/// from this point onwards +/// * `name` - name of the contract specification as defined in the config +/// * `address` - address of the contract +/// +/// # Example +/// +/// ```ignore +/// // In an EventHandler... +/// chaindexing::include_contract(&context, "UniswapV3Pool", &pool_contract_address) +/// .await; +/// // Includes a new UniswapV3Pool contract:{pool_contract_address} for indexing... +/// ``` pub async fn include_contract<'a, C: handlers::HandlerContext<'a>>( event_context: &C, contract_name: &str, diff --git a/chaindexing/src/states/chain_state.rs b/chaindexing/src/states/chain_state.rs index c50f792..45d386d 100644 --- a/chaindexing/src/states/chain_state.rs +++ b/chaindexing/src/states/chain_state.rs @@ -13,22 +13,28 @@ use super::updates::Updates; use serde::de::DeserializeOwned; use serde::Serialize; +/// States derived from different contracts within a chain #[async_trait::async_trait] pub trait ChainState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send + 'static { + /// Table of the state as specified in StateMigrations fn table_name() -> &'static str; + /// Inserts state in the state's table async fn create<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { state::create(Self::table_name(), &state::to_view(self), context).await; } + /// Returns a single state matching filters. Panics if there are multiple. async fn read_one<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Option { Self::read_many(filters, context).await.first().cloned() } + /// Returns states matching filters async fn read_many<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Vec { read_many(filters, context, Self::table_name()).await } + /// Updates state with the specified updates async fn update<'a, 'b>(&self, updates: &Updates, context: &PureHandlerContext<'a, 'b>) { let event = &context.event; let client = context.repo_client; @@ -41,6 +47,7 @@ pub trait ChainState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send StateView::refresh(&latest_state_version, table_name, client).await; } + /// Deletes state from the state's table async fn delete<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { let event = &context.event; let client = context.repo_client; diff --git a/chaindexing/src/states/contract_state.rs b/chaindexing/src/states/contract_state.rs index 1caf961..f83ae15 100644 --- a/chaindexing/src/states/contract_state.rs +++ b/chaindexing/src/states/contract_state.rs @@ -13,24 +13,30 @@ use super::updates::Updates; use serde::de::DeserializeOwned; use serde::Serialize; +/// States derived from a contract #[async_trait::async_trait] pub trait ContractState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send + 'static { + /// Table of the state as specified in StateMigrations fn table_name() -> &'static str; + /// Inserts state in the state's table async fn create<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { state::create(Self::table_name(), &state::to_view(self), context).await; } + /// Returns a single state matching filters. Panics if there are multiple. async fn read_one<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Option { Self::read_many(filters, context).await.first().cloned() } + /// Returns states matching filters async fn read_many<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Vec { read_many(filters, context, Self::table_name()).await } + /// Updates state with the specified updates async fn update<'a, 'b>(&self, updates: &Updates, context: &PureHandlerContext<'a, 'b>) { let event = &context.event; let client = context.repo_client; @@ -43,6 +49,7 @@ pub trait ContractState: StateView::refresh(&latest_state_version, table_name, client).await; } + /// Deletes state from the state's table async fn delete<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { let event = &context.event; let client = context.repo_client; diff --git a/chaindexing/src/states/filters.rs b/chaindexing/src/states/filters.rs index 3365870..3c0ed4d 100644 --- a/chaindexing/src/states/filters.rs +++ b/chaindexing/src/states/filters.rs @@ -9,34 +9,84 @@ enum FiltersContext { MultiChain, } +/// Represents a set of filters used for querying data. #[derive(Clone, Debug)] pub struct Filters { - values: HashMap, - context: FiltersContext, + values: HashMap, // A map of filter field names to their values. + context: FiltersContext, // The context in which the filters are applied. } impl Filters { + /// Creates a new Filters instance with a single filter. + /// + /// # Arguments + /// + /// * `field` - The field name of the filter. + /// * `value` - The value of the filter. + /// + /// # Example + /// + /// ```ignore + /// let filters = Filters::new("token_id", token_id); + /// Nft::read_one(&filters, &context); + /// ``` pub fn new(field: impl ToString, value: impl ToString) -> Self { Self { values: HashMap::from([(field.to_string(), value.to_string())]), context: FiltersContext::Contract, } } + + /// Adds a new filter to the existing set of filters by moving the + /// original filters + /// + /// # Arguments + /// + /// * `field` - The field name of the filter. + /// * `value` - The value of the filter. + /// + /// # Example + /// + /// ``` + /// Filters::new("address", address).add("token_id", token_id); // filters is moved + /// ``` pub fn add(mut self, field: impl ToString, value: impl ToString) -> Self { self.add_mut(field, value); self } + + /// Adds a new filter to the existing set of filters without moving + /// the original filters + /// + /// # Arguments + /// + /// * `field` - The field name of the filter. + /// * `value` - The value of the filter. + /// + /// # Example + /// + /// ``` + /// let mut filters = Filters::new("address", address); + /// + /// filters.add_mut("token_id", token_id); // filters not moved + /// ``` pub fn add_mut(&mut self, field: impl ToString, value: impl ToString) { self.values.insert(field.to_string(), value.to_string()); } + + /// Sets the context of the filters to Contract pub fn within_contract(mut self) -> Self { self.context = FiltersContext::Contract; self } + + /// Sets the context of the filters to Chain pub fn within_chain(mut self) -> Self { self.context = FiltersContext::Chain; self } + + /// Sets the context of the filters to MultiChain pub fn within_multi_chain(mut self) -> Self { self.context = FiltersContext::MultiChain; self diff --git a/chaindexing/src/states/migrations.rs b/chaindexing/src/states/migrations.rs index 4503c92..817e572 100644 --- a/chaindexing/src/states/migrations.rs +++ b/chaindexing/src/states/migrations.rs @@ -6,6 +6,18 @@ use super::STATE_VERSIONS_TABLE_PREFIX; // easen the type strictness for consumer applications. // Trait/Callback? this way, consumer apps can statically visualize their migrations pub trait StateMigrations: Send + Sync { + /// SQL migrations for the state to index. These migrations must be idempotent + /// and will require using the 'IF NOT EXISTS` check + /// + /// # Example + /// ```ignore + /// fn migrations(&self) -> &'static [&'static str] { + /// &["CREATE TABLE IF NOT EXISTS nfts ( + /// token_id INTEGER NOT NULL, + /// owner_address TEXT NOT NULL + /// )"] + /// } + /// ``` fn migrations(&self) -> &'static [&'static str]; fn get_table_names(&self) -> Vec { diff --git a/chaindexing/src/states/multi_chain_state.rs b/chaindexing/src/states/multi_chain_state.rs index bf3e706..2cc7d1a 100644 --- a/chaindexing/src/states/multi_chain_state.rs +++ b/chaindexing/src/states/multi_chain_state.rs @@ -12,25 +12,31 @@ use super::updates::Updates; use serde::de::DeserializeOwned; use serde::Serialize; +/// States derived from different contracts across different chains /// N/B: Indexing MultiChainStates must be Order-Agnostic #[async_trait::async_trait] pub trait MultiChainState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send + 'static { + /// Table of the state as specified in StateMigrations fn table_name() -> &'static str; + /// Inserts state in the state's table async fn create<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { state::create(Self::table_name(), &state::to_view(self), context).await; } + /// Returns a single state matching filters. Panics if there are multiple. async fn read_one<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Option { Self::read_many(filters, context).await.first().cloned() } + /// Returns states matching filters async fn read_many<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Vec { read_many(filters, context, Self::table_name()).await } + /// Updates state with the specified updates async fn update<'a, 'b>(&self, updates: &Updates, context: &PureHandlerContext<'a, 'b>) { let event = context.event.clone(); let client = context.repo_client; @@ -57,6 +63,7 @@ pub trait MultiChainState: .await; } + /// Deletes state from the state's table async fn delete<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) { let event = context.event.clone(); let client = context.repo_client; diff --git a/chaindexing/src/states/updates.rs b/chaindexing/src/states/updates.rs index 86dd30d..f7be3d6 100644 --- a/chaindexing/src/states/updates.rs +++ b/chaindexing/src/states/updates.rs @@ -1,20 +1,62 @@ use std::{collections::HashMap, fmt::Debug}; +/// Represents the fields to be updated in a state #[derive(Clone, Debug)] pub struct Updates { pub(super) values: HashMap, } impl Updates { + /// Creates a new Updates instance with a single filter. + /// + /// # Arguments + /// + /// * `field` - The field name to update. + /// * `value` - new value of field + /// + /// # Example + /// + /// ```ignore + /// let updates = Updates::new("owner_address", token_id); + /// nft_state.update(&updates, &context).await; + /// ``` pub fn new(field: impl ToString, value: impl ToString) -> Self { Self { values: HashMap::from([(field.to_string(), value.to_string())]), } } + /// Adds a new update to the existing set of updates by moving the + /// original updates + /// + /// # Arguments + /// + /// * `field` - The field name to update. + /// * `value` - new value of field + /// + /// # Example + /// + /// ``` + /// Updates::new("address", address).add("token_id", token_id); // updates is moved + /// ``` pub fn add(mut self, field: impl ToString, value: impl ToString) -> Self { self.add_mut(field, value); self } + /// Adds a new update to the existing set of updates without moving + /// the original updates + /// + /// # Arguments + /// + /// * `field` - The field name to update. + /// * `value` - new value of field + /// + /// # Example + /// + /// ``` + /// let mut updates = Updates::new("address", address); + /// + /// updates.add_mut("token_id", token_id);// updates not moved + /// ``` pub fn add_mut(&mut self, field: impl ToString, value: impl ToString) { self.values.insert(field.to_string(), value.to_string()); }