Skip to content

Commit

Permalink
Add documentation for public APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Apr 19, 2024
1 parent 53838dc commit 164d5c0
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 13 deletions.
18 changes: 17 additions & 1 deletion chaindexing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl OptimizationConfig {
}
}

/// Configuration for indexing states
#[derive(Clone, Debug)]
pub struct Config<SharedState: Sync + Send + Clone> {
pub chains: Vec<Chain>,
Expand Down Expand Up @@ -87,12 +88,14 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
}
}

// Includes chain in config
pub fn add_chain(mut self, chain: Chain) -> Self {
self.chains.push(chain);

self
}

// Includes contract in config
pub fn add_contract(mut self, contract: Contract<SharedState>) -> Self {
self.contracts.push(contract);

Expand All @@ -106,49 +109,61 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
self
}

/// Restarts indexing from scratch for EventHandlers. SideEffectHandlers
/// will not run if they ran already
pub fn reset(mut self, count: u64) -> Self {
self.reset_count = count;

self
}

/// Restarts indexing from scratch for all Handlers. SideEffectHandlers
/// will RUN even if they ran already
pub fn reset_including_side_effects_dangerously(mut self, count: u64) -> Self {
self.reset_including_side_effects_count = count;

self
}

/// Defines the initial state for side effect handlers
pub fn with_initial_state(mut self, initial_state: SharedState) -> Self {
self.shared_state = Some(Arc::new(Mutex::new(initial_state)));

self
}

/// The minimum confirmation count for detecting chain-reorganizations or uncled blocks
pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self {
self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count);

self
}

/// Advance config: How many blocks per batch should be ingested and handled.
/// Default is 8_000
pub fn with_blocks_per_batch(mut self, blocks_per_batch: u64) -> Self {
self.blocks_per_batch = blocks_per_batch;

self
}

/// Advance config: How often should the events handlers processes run.
/// Default is 4_000
pub fn with_handler_rate_ms(mut self, handler_rate_ms: u64) -> Self {
self.handler_rate_ms = handler_rate_ms;

self
}

/// Advance config: How often should the events ingester processes run.
/// Default is 20_000
pub fn with_ingestion_rate_ms(mut self, ingestion_rate_ms: u64) -> Self {
self.ingestion_rate_ms = ingestion_rate_ms;

self
}

// Configures number of chain batches to be processed concurrently
/// Configures number of chain batches to be processed concurrently
pub fn with_chain_concurrency(mut self, chain_concurrency: u32) -> Self {
self.chain_concurrency = chain_concurrency;

Expand All @@ -167,6 +182,7 @@ impl<SharedState: Sync + Send + Clone> Config<SharedState> {
self
}

/// Deletes stale events and related-internal data
pub fn with_pruning(mut self) -> Self {
self.pruning_config = Some(Default::default());

Expand Down
19 changes: 16 additions & 3 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl ContractEvent {

pub type EventAbi = &'static str;

/// Represents the specification for a given contract.
#[derive(Clone)]
pub struct Contract<S: Send + Sync + Clone> {
pub addresses: Vec<UnsavedContractAddress>,
Expand All @@ -44,6 +45,14 @@ pub struct Contract<S: Send + Sync + Clone> {
}

impl<S: Send + Sync + Clone> Contract<S> {
/// Builds the specification for a contract
///
/// # Example
/// ```
/// use chaindexing::Contract
///
/// Contract::new("ERC20")
/// ```
pub fn new(name: &str) -> Self {
Self {
addresses: vec![],
Expand All @@ -54,6 +63,7 @@ impl<S: Send + Sync + Clone> Contract<S> {
}
}

/// Adds a contract address to a contract
pub fn add_address(
mut self,
address: &str,
Expand All @@ -70,12 +80,14 @@ impl<S: Send + Sync + Clone> Contract<S> {
self
}

/// Adds an event handler
pub fn add_event_handler(mut self, handler: impl EventHandler + 'static) -> Self {
self.pure_handlers.insert(handler.abi(), Arc::new(handler));

self
}

/// Adds a side-effect handler
pub fn add_side_effect_handler(
mut self,
handler: impl SideEffectHandler<SharedState = S> + 'static,
Expand All @@ -85,13 +97,14 @@ impl<S: Send + Sync + Clone> Contract<S> {
self
}

/// Adds state migrations for the contract states being indexed
pub fn add_state_migrations(mut self, state_migration: impl StateMigrations + 'static) -> Self {
self.state_migrations.push(Arc::new(state_migration));

self
}

pub fn get_event_abis(&self) -> Vec<EventAbi> {
pub(crate) fn get_event_abis(&self) -> Vec<EventAbi> {
let mut event_abis: Vec<_> = self.pure_handlers.clone().into_keys().collect();
let side_effect_abis: Vec<_> = self.pure_handlers.clone().into_keys().collect();

Expand All @@ -101,14 +114,14 @@ impl<S: Send + Sync + Clone> Contract<S> {
event_abis
}

pub fn get_event_topics(&self) -> Vec<ContractEventTopic> {
pub(crate) fn get_event_topics(&self) -> Vec<ContractEventTopic> {
self.get_event_abis()
.iter()
.map(|abi| HumanReadableParser::parse_event(abi).unwrap().signature())
.collect()
}

pub fn build_events(&self) -> Vec<ContractEvent> {
pub(crate) fn build_events(&self) -> Vec<ContractEvent> {
self.get_event_abis().iter().map(|abi| ContractEvent::new(abi)).collect()
}
}
Expand Down
2 changes: 2 additions & 0 deletions chaindexing/src/events/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use uuid::Uuid;

use serde::Deserialize;

/// An event aka provider logs are emitted from smart contracts
/// to help infer their state
#[derive(Debug, Deserialize, Clone, Eq, Queryable, Insertable)]
#[diesel(table_name = chaindexing_events)]
pub struct Event {
Expand Down
2 changes: 2 additions & 0 deletions chaindexing/src/handlers/pure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::{ChaindexingRepoClient, ChaindexingRepoTxnClient, EventParam};

use super::handler_context::HandlerContext;

/// Pure handlers do not contain any side effects. They are simple reducers
/// that derive or index states deterministically.
#[async_trait::async_trait]
pub trait PureHandler: Send + Sync {
/// The human-readable ABI of the event being handled.
Expand Down
12 changes: 6 additions & 6 deletions chaindexing/src/handlers/side_effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ use super::handler_context::HandlerContext;

#[async_trait::async_trait]

/// SideEffectHandlers are event handlers that help handle side-effects for events.
/// This is useful for handling events only ONCE and can rely on a non-deterministic
/// shared state. Some use-cases are notifications, bridging etc. Chaindexing ensures
/// that the side-effect handlers are called once immutable regardless of resets.
/// However, one can dangerously reset including side effects with the new `reset_including_side_effects`
/// Config API.
pub trait SideEffectHandler: Send + Sync {
type SharedState: Send + Sync + Clone + Debug;

Expand All @@ -21,12 +27,6 @@ pub trait SideEffectHandler: Send + Sync {
async fn handle_event<'a>(&self, context: SideEffectHandlerContext<'a, Self::SharedState>);
}

// SideEffectHandlers are event handlers that help handle side-effects for events.
// This is useful for handling events only ONCE and can rely on a non-deterministic
// shared state. Some use-cases are notifications, bridging etc. Chaindexing ensures
// that the side-effect handlers are called once immutable regardless of resets.
// However, one can dangerously reset including side effects with the new `reset_including_side_effects`
// Config API.
#[derive(Clone)]
pub struct SideEffectHandlerContext<'a, SharedState: Sync + Send + Clone> {
pub event: Event,
Expand Down
20 changes: 19 additions & 1 deletion chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use nodes::NodeTasks;

use crate::nodes::{NodeTask, NodeTasksRunner};

pub type ChaindexingRepoClientMutex = Arc<Mutex<PostgresRepoClient>>;
pub(crate) type ChaindexingRepoClientMutex = Arc<Mutex<PostgresRepoClient>>;

pub enum ChaindexingError {
Config(ConfigError),
Expand All @@ -84,6 +84,7 @@ impl Debug for ChaindexingError {
}
}

/// Starts processes to ingest and index states as configured
pub async fn index_states<S: Send + Sync + Clone + Debug + 'static>(
config: &Config<S>,
) -> Result<(), ChaindexingError> {
Expand Down Expand Up @@ -128,6 +129,23 @@ pub async fn index_states<S: Send + Sync + Clone + Debug + 'static>(
Ok(())
}

/// Includes runtime-discovered contract addresses for indexing
///
/// # Arguments
///
/// * `event_context` - context where the contract was discovered. Indexing starts
/// from this point onwards
/// * `name` - name of the contract specification as defined in the config
/// * `address` - address of the contract
///
/// # Example
///
/// ```ignore
/// // In an EventHandler...
/// chaindexing::include_contract(&context, "UniswapV3Pool", &pool_contract_address)
/// .await;
/// // Includes a new UniswapV3Pool contract:{pool_contract_address} for indexing...
/// ```
pub async fn include_contract<'a, C: handlers::HandlerContext<'a>>(
event_context: &C,
contract_name: &str,
Expand Down
7 changes: 7 additions & 0 deletions chaindexing/src/states/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,28 @@ use super::updates::Updates;
use serde::de::DeserializeOwned;
use serde::Serialize;

/// States derived from different contracts within a chain
#[async_trait::async_trait]
pub trait ChainState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send + 'static {
/// Table of the state as specified in StateMigrations
fn table_name() -> &'static str;

/// Inserts state in the state's table
async fn create<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) {
state::create(Self::table_name(), &state::to_view(self), context).await;
}

/// Returns a single state matching filters. Panics if there are multiple.
async fn read_one<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Option<Self> {
Self::read_many(filters, context).await.first().cloned()
}

/// Returns states matching filters
async fn read_many<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Vec<Self> {
read_many(filters, context, Self::table_name()).await
}

/// Updates state with the specified updates
async fn update<'a, 'b>(&self, updates: &Updates, context: &PureHandlerContext<'a, 'b>) {
let event = &context.event;
let client = context.repo_client;
Expand All @@ -41,6 +47,7 @@ pub trait ChainState: DeserializeOwned + Serialize + Clone + Debug + Sync + Send
StateView::refresh(&latest_state_version, table_name, client).await;
}

/// Deletes state from the state's table
async fn delete<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) {
let event = &context.event;
let client = context.repo_client;
Expand Down
7 changes: 7 additions & 0 deletions chaindexing/src/states/contract_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,30 @@ use super::updates::Updates;
use serde::de::DeserializeOwned;
use serde::Serialize;

/// States derived from a contract
#[async_trait::async_trait]
pub trait ContractState:
DeserializeOwned + Serialize + Clone + Debug + Sync + Send + 'static
{
/// Table of the state as specified in StateMigrations
fn table_name() -> &'static str;

/// Inserts state in the state's table
async fn create<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) {
state::create(Self::table_name(), &state::to_view(self), context).await;
}

/// Returns a single state matching filters. Panics if there are multiple.
async fn read_one<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Option<Self> {
Self::read_many(filters, context).await.first().cloned()
}

/// Returns states matching filters
async fn read_many<'a, C: HandlerContext<'a>>(filters: &Filters, context: &C) -> Vec<Self> {
read_many(filters, context, Self::table_name()).await
}

/// Updates state with the specified updates
async fn update<'a, 'b>(&self, updates: &Updates, context: &PureHandlerContext<'a, 'b>) {
let event = &context.event;
let client = context.repo_client;
Expand All @@ -43,6 +49,7 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

/// Deletes state from the state's table
async fn delete<'a, 'b>(&self, context: &PureHandlerContext<'a, 'b>) {
let event = &context.event;
let client = context.repo_client;
Expand Down
Loading

0 comments on commit 164d5c0

Please sign in to comment.