Skip to content

Commit

Permalink
Introduce SharedState In EventHandlers (Breaking Change)
Browse files Browse the repository at this point in the history
A shared state is exposed in event handlers to allow performing
side effects using the app's state or resources. A typical example
would be to store the DBPool for a web app in the shared state to
allow doing some DB related logic owned by your application.

When there is no shared state required for indexing, the user can simply
pass None in the `Config::new` build fn and `NoSharedState` in the
associated type for the event handlers.
  • Loading branch information
Jurshsmith committed Dec 23, 2023
1 parent 549dfb8 commit 5134b9a
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 77 deletions.
4 changes: 2 additions & 2 deletions chaindexing-tests/src/factory/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chaindexing::{Chain, Contract};
use chaindexing::{Chain, Contract, NoSharedState};

use super::{ApprovalForAllTestEventHandler, TransferTestEventHandler};

Expand All @@ -10,7 +10,7 @@ pub const APPROCAL_EVENT_ABI: &str =

pub const BAYC_CONTRACT_ADDRESS: &str = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D";
pub const BAYC_CONTRACT_START_BLOCK_NUMBER: u32 = 17773490;
pub fn bayc_contract() -> Contract {
pub fn bayc_contract() -> Contract<NoSharedState> {
Contract::new("BoredApeYachtClub")
.add_event(TRANSFER_EVENT_ABI, TransferTestEventHandler)
.add_event(APPROCAL_EVENT_ABI, ApprovalForAllTestEventHandler)
Expand Down
10 changes: 7 additions & 3 deletions chaindexing-tests/src/factory/event_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chaindexing::{EventContext, EventHandler};
use chaindexing::{EventContext, EventHandler, NoSharedState};

#[derive(Clone, Debug)]
pub struct NftState;
Expand All @@ -7,12 +7,16 @@ pub struct TransferTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for TransferTestEventHandler {
async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {}
type SharedState = NoSharedState;

async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {}
}

pub struct ApprovalForAllTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for ApprovalForAllTestEventHandler {
async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {}
type SharedState = NoSharedState;

async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {}
}
4 changes: 2 additions & 2 deletions chaindexing-tests/src/factory/events.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;

use chaindexing::{Contract, Event, Events};
use chaindexing::{Contract, Event, Events, NoSharedState};
use ethers::types::Block;

use super::{transfer_log, BAYC_CONTRACT_ADDRESS};

pub fn transfer_event_with_contract(contract: Contract) -> Event {
pub fn transfer_event_with_contract(contract: Contract<NoSharedState>) -> Event {
let contract_address = BAYC_CONTRACT_ADDRESS;
let transfer_log = transfer_log(contract_address);
let blocks_by_number = HashMap::from([(
Expand Down
2 changes: 1 addition & 1 deletion chaindexing-tests/src/factory/json_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ macro_rules! json_rpc_with_logs {
json_rpc_with_logs!($contract_address, 17774490)
}};
($contract_address:expr, $current_block_number:expr) => {{
use crate::factory::transfer_log;
use chaindexing::EventsIngesterJsonRpc;
use ethers::providers::ProviderError;
use ethers::types::{Block, Filter, Log, TxHash, U64};
use $crate::factory::transfer_log;

#[derive(Clone)]
struct JsonRpc;
Expand Down
11 changes: 7 additions & 4 deletions chaindexing-tests/src/tests/contract_states.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient};
use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient, NoSharedState};

use super::*;
use crate::factory::{bayc_contract, transfer_event_with_contract};
Expand All @@ -12,9 +12,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 2 };
Expand All @@ -37,9 +38,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 1 };
Expand Down Expand Up @@ -68,9 +70,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 9 };
Expand Down
5 changes: 3 additions & 2 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ mod tests {
json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner,
};
use chaindexing::{
Chain, ChaindexingRepo, EventsIngester, HasRawQueryClient, MinConfirmationCount, Repo,
Chain, ChaindexingRepo, Contract, EventsIngester, HasRawQueryClient, MinConfirmationCount,
NoSharedState, Repo,
};

#[tokio::test]
Expand Down Expand Up @@ -151,7 +152,7 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |conn| async move {
let contracts = vec![];
let contracts: Vec<Contract<NoSharedState>> = vec![];
let json_rpc = Arc::new(empty_json_rpc());
let blocks_per_batch = 10;
let conn = Arc::new(Mutex::new(conn));
Expand Down
15 changes: 9 additions & 6 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use ethers::types::Chain;
use tokio::sync::Mutex;

use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount};

Expand All @@ -22,19 +23,20 @@ impl std::fmt::Debug for ConfigError {
}
}
#[derive(Clone)]
pub struct Config {
pub struct Config<SharedState: Sync + Send + Clone> {
pub chains: Chains,
pub repo: ChaindexingRepo,
pub contracts: Vec<Contract>,
pub contracts: Vec<Contract<SharedState>>,
pub min_confirmation_count: MinConfirmationCount,
pub blocks_per_batch: u64,
pub handler_rate_ms: u64,
pub ingestion_rate_ms: u64,
pub reset_count: u8,
pub shared_state: Option<Arc<Mutex<SharedState>>>,
}

impl Config {
pub fn new(repo: ChaindexingRepo) -> Self {
impl<SharedState: Sync + Send + Clone> Config<SharedState> {
pub fn new(repo: ChaindexingRepo, initial_state: Option<SharedState>) -> Self {
Self {
repo,
chains: HashMap::new(),
Expand All @@ -44,6 +46,7 @@ impl Config {
handler_rate_ms: 4000,
ingestion_rate_ms: 4000,
reset_count: 0,
shared_state: initial_state.map(|s| Arc::new(Mutex::new(s))),
}
}

Expand All @@ -53,7 +56,7 @@ impl Config {
self
}

pub fn add_contract(mut self, contract: Contract) -> Self {
pub fn add_contract(mut self, contract: Contract<SharedState>) -> Self {
self.contracts.push(contract);

self
Expand Down
18 changes: 11 additions & 7 deletions chaindexing/src/contract_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait ContractState:
StateView::get_complete(&view, table_name, client).await
}

async fn create<'a>(&self, context: &EventHandlerContext) {
async fn create<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext<S>) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -87,7 +87,11 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn update<'a>(&self, updates: HashMap<String, String>, context: &EventHandlerContext) {
async fn update<'a, S: Send + Sync + Clone>(
&self,
updates: HashMap<String, String>,
context: &EventHandlerContext<S>,
) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -99,7 +103,7 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn delete<'a>(&self, context: &EventHandlerContext) {
async fn delete<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext<S>) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -111,18 +115,18 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn read_one<'a>(
async fn read_one<'a, S: Send + Sync + Clone>(
filters: HashMap<String, String>,
context: &EventHandlerContext,
context: &EventHandlerContext<S>,
) -> Option<Self> {
let states = Self::read_many(filters, context).await;

states.first().cloned()
}

async fn read_many<'a>(
async fn read_many<'a, S: Send + Sync + Clone>(
filters: HashMap<String, String>,
context: &EventHandlerContext,
context: &EventHandlerContext<S>,
) -> Vec<Self> {
let client = context.get_raw_query_client();

Expand Down
32 changes: 17 additions & 15 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ impl ContractEvent {
type EventAbi = &'static str;

#[derive(Clone)]
pub struct Contract {
pub struct Contract<S: Send + Sync + Clone> {
pub addresses: Vec<UnsavedContractAddress>,
pub name: String,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler>>,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler<SharedState = S>>>,
pub state_migrations: Vec<Arc<dyn ContractStateMigrations>>,
}

impl Contract {
impl<S: Send + Sync + Clone> Contract<S> {
pub fn new(name: &str) -> Self {
Self {
addresses: vec![],
Expand All @@ -47,7 +47,7 @@ impl Contract {
}
}

pub fn add_address(&self, address: &str, chain: &Chain, start_block_number: i64) -> Self {
pub fn add_address(&mut self, address: &str, chain: &Chain, start_block_number: i64) -> Self {
let mut addresses = self.addresses.clone();

addresses.push(UnsavedContractAddress::new(
Expand All @@ -66,7 +66,7 @@ impl Contract {
pub fn add_event(
mut self,
event_abi: EventAbi,
event_handler: impl EventHandler + 'static,
event_handler: impl EventHandler<SharedState = S> + 'static,
) -> Self {
self.event_handlers.insert(event_abi, Arc::new(event_handler));

Expand Down Expand Up @@ -101,13 +101,15 @@ impl Contract {
pub struct Contracts;

impl Contracts {
pub fn get_state_migrations(contracts: &[Contract]) -> Vec<Arc<dyn ContractStateMigrations>> {
pub fn get_state_migrations<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> Vec<Arc<dyn ContractStateMigrations>> {
contracts.iter().flat_map(|c| c.state_migrations.clone()).collect()
}

pub fn get_all_event_handlers_by_event_abi(
contracts: &[Contract],
) -> HashMap<EventAbi, Arc<dyn EventHandler>> {
pub fn get_all_event_handlers_by_event_abi<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<EventAbi, Arc<dyn EventHandler<SharedState = S>>> {
contracts.iter().fold(
HashMap::new(),
|mut event_handlers_by_event_abi, contract| {
Expand All @@ -120,8 +122,8 @@ impl Contracts {
)
}

pub fn group_event_topics_by_names(
contracts: &[Contract],
pub fn group_event_topics_by_names<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<String, Vec<ContractEventTopic>> {
contracts.iter().fold(HashMap::new(), |mut topics_by_contract_name, contract| {
topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics());
Expand All @@ -130,8 +132,8 @@ impl Contracts {
})
}

pub fn group_events_by_topics(
contracts: &[Contract],
pub fn group_events_by_topics<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<ContractEventTopic, ContractEvent> {
contracts
.iter()
Expand All @@ -140,8 +142,8 @@ impl Contracts {
.collect()
}

pub fn get_all_contract_addresses_grouped_by_address(
contracts: &[Contract],
pub fn get_all_contract_addresses_grouped_by_address<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<Address, &UnsavedContractAddress> {
contracts.iter().fold(HashMap::new(), |mut contracts_by_addresses, contract| {
contract.addresses.iter().for_each(
Expand Down
29 changes: 24 additions & 5 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::{sync::Arc, time::Duration};

mod handle_events;
Expand All @@ -9,34 +10,51 @@ use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo};
use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient};

#[derive(Clone)]
pub struct EventHandlerContext<'a> {
pub struct EventHandlerContext<'a, SharedState: Sync + Send + Clone> {
pub event: Event,
raw_query_client: &'a ChaindexingRepoRawQueryTxnClient<'a>,
shared_state: Option<Arc<Mutex<SharedState>>>,
}

impl<'a> EventHandlerContext<'a> {
pub fn new(event: Event, client: &'a ChaindexingRepoRawQueryTxnClient<'a>) -> Self {
impl<'a, SharedState: Sync + Send + Clone> EventHandlerContext<'a, SharedState> {
pub fn new(
event: Event,
client: &'a ChaindexingRepoRawQueryTxnClient<'a>,
shared_state: Option<Arc<Mutex<SharedState>>>,
) -> Self {
Self {
event,
raw_query_client: client,
shared_state,
}
}

pub async fn get_shared_state(&self) -> SharedState {
let shared_state = self.shared_state.clone().unwrap();
let shared_state = shared_state.lock().await;
shared_state.clone()
}

pub(super) fn get_raw_query_client(&self) -> &'a ChaindexingRepoRawQueryTxnClient<'a> {
self.raw_query_client
}
}

#[derive(Clone, Debug)]
pub struct NoSharedState;

#[async_trait::async_trait]
pub trait EventHandler: Send + Sync {
async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a>);
type SharedState: Send + Sync + Clone + Debug;

async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a, Self::SharedState>);
}

// TODO: Use just raw query client through for mutations
pub struct EventHandlers;

impl EventHandlers {
pub fn start(config: &Config) {
pub fn start<S: Send + Sync + Clone + Debug + 'static>(config: &Config<S>) {
let config = config.clone();
tokio::spawn(async move {
let pool = config.repo.get_pool(1).await;
Expand All @@ -56,6 +74,7 @@ impl EventHandlers {
conn.clone(),
&event_handlers_by_event_abi,
&mut raw_query_client,
config.shared_state.clone(),
)
.await;

Expand Down
Loading

0 comments on commit 5134b9a

Please sign in to comment.