Skip to content

Commit

Permalink
Merge pull request #52 from chaindexing/introduce-shared-state
Browse files Browse the repository at this point in the history
Introduce SharedState In EventHandlers (Breaking Change)
  • Loading branch information
Jurshsmith authored Dec 23, 2023
2 parents 549dfb8 + 5134b9a commit 49e4da9
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 77 deletions.
4 changes: 2 additions & 2 deletions chaindexing-tests/src/factory/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chaindexing::{Chain, Contract};
use chaindexing::{Chain, Contract, NoSharedState};

use super::{ApprovalForAllTestEventHandler, TransferTestEventHandler};

Expand All @@ -10,7 +10,7 @@ pub const APPROCAL_EVENT_ABI: &str =

pub const BAYC_CONTRACT_ADDRESS: &str = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D";
pub const BAYC_CONTRACT_START_BLOCK_NUMBER: u32 = 17773490;
pub fn bayc_contract() -> Contract {
pub fn bayc_contract() -> Contract<NoSharedState> {
Contract::new("BoredApeYachtClub")
.add_event(TRANSFER_EVENT_ABI, TransferTestEventHandler)
.add_event(APPROCAL_EVENT_ABI, ApprovalForAllTestEventHandler)
Expand Down
10 changes: 7 additions & 3 deletions chaindexing-tests/src/factory/event_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chaindexing::{EventContext, EventHandler};
use chaindexing::{EventContext, EventHandler, NoSharedState};

#[derive(Clone, Debug)]
pub struct NftState;
Expand All @@ -7,12 +7,16 @@ pub struct TransferTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for TransferTestEventHandler {
async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {}
type SharedState = NoSharedState;

async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {}
}

pub struct ApprovalForAllTestEventHandler;

#[async_trait::async_trait]
impl EventHandler for ApprovalForAllTestEventHandler {
async fn handle_event<'a>(&self, _event_context: EventContext<'a>) {}
type SharedState = NoSharedState;

async fn handle_event<'a>(&self, _event_context: EventContext<'a, Self::SharedState>) {}
}
4 changes: 2 additions & 2 deletions chaindexing-tests/src/factory/events.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;

use chaindexing::{Contract, Event, Events};
use chaindexing::{Contract, Event, Events, NoSharedState};
use ethers::types::Block;

use super::{transfer_log, BAYC_CONTRACT_ADDRESS};

pub fn transfer_event_with_contract(contract: Contract) -> Event {
pub fn transfer_event_with_contract(contract: Contract<NoSharedState>) -> Event {
let contract_address = BAYC_CONTRACT_ADDRESS;
let transfer_log = transfer_log(contract_address);
let blocks_by_number = HashMap::from([(
Expand Down
2 changes: 1 addition & 1 deletion chaindexing-tests/src/factory/json_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ macro_rules! json_rpc_with_logs {
json_rpc_with_logs!($contract_address, 17774490)
}};
($contract_address:expr, $current_block_number:expr) => {{
use crate::factory::transfer_log;
use chaindexing::EventsIngesterJsonRpc;
use ethers::providers::ProviderError;
use ethers::types::{Block, Filter, Log, TxHash, U64};
use $crate::factory::transfer_log;

#[derive(Clone)]
struct JsonRpc;
Expand Down
11 changes: 7 additions & 4 deletions chaindexing-tests/src/tests/contract_states.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient};
use chaindexing::{ChaindexingRepo, EventContext, HasRawQueryClient, NoSharedState};

use super::*;
use crate::factory::{bayc_contract, transfer_event_with_contract};
Expand All @@ -12,9 +12,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 2 };
Expand All @@ -37,9 +38,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 1 };
Expand Down Expand Up @@ -68,9 +70,10 @@ mod tests {
let mut raw_query_client = test_runner::new_repo().get_raw_query_client().await;
let raw_query_txn_client =
ChaindexingRepo::get_raw_query_txn_client(&mut raw_query_client).await;
let event_context = EventContext::new(
let event_context: EventContext<'_, NoSharedState> = EventContext::new(
transfer_event_with_contract(bayc_contract),
&raw_query_txn_client,
None,
);

let new_state = NftState { token_id: 9 };
Expand Down
5 changes: 3 additions & 2 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ mod tests {
json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner,
};
use chaindexing::{
Chain, ChaindexingRepo, EventsIngester, HasRawQueryClient, MinConfirmationCount, Repo,
Chain, ChaindexingRepo, Contract, EventsIngester, HasRawQueryClient, MinConfirmationCount,
NoSharedState, Repo,
};

#[tokio::test]
Expand Down Expand Up @@ -151,7 +152,7 @@ mod tests {
let pool = test_runner::get_pool().await;

test_runner::run_test(&pool, |conn| async move {
let contracts = vec![];
let contracts: Vec<Contract<NoSharedState>> = vec![];
let json_rpc = Arc::new(empty_json_rpc());
let blocks_per_batch = 10;
let conn = Arc::new(Mutex::new(conn));
Expand Down
15 changes: 9 additions & 6 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use ethers::types::Chain;
use tokio::sync::Mutex;

use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount};

Expand All @@ -22,19 +23,20 @@ impl std::fmt::Debug for ConfigError {
}
}
#[derive(Clone)]
pub struct Config {
pub struct Config<SharedState: Sync + Send + Clone> {
pub chains: Chains,
pub repo: ChaindexingRepo,
pub contracts: Vec<Contract>,
pub contracts: Vec<Contract<SharedState>>,
pub min_confirmation_count: MinConfirmationCount,
pub blocks_per_batch: u64,
pub handler_rate_ms: u64,
pub ingestion_rate_ms: u64,
pub reset_count: u8,
pub shared_state: Option<Arc<Mutex<SharedState>>>,
}

impl Config {
pub fn new(repo: ChaindexingRepo) -> Self {
impl<SharedState: Sync + Send + Clone> Config<SharedState> {
pub fn new(repo: ChaindexingRepo, initial_state: Option<SharedState>) -> Self {
Self {
repo,
chains: HashMap::new(),
Expand All @@ -44,6 +46,7 @@ impl Config {
handler_rate_ms: 4000,
ingestion_rate_ms: 4000,
reset_count: 0,
shared_state: initial_state.map(|s| Arc::new(Mutex::new(s))),
}
}

Expand All @@ -53,7 +56,7 @@ impl Config {
self
}

pub fn add_contract(mut self, contract: Contract) -> Self {
pub fn add_contract(mut self, contract: Contract<SharedState>) -> Self {
self.contracts.push(contract);

self
Expand Down
18 changes: 11 additions & 7 deletions chaindexing/src/contract_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait ContractState:
StateView::get_complete(&view, table_name, client).await
}

async fn create<'a>(&self, context: &EventHandlerContext) {
async fn create<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext<S>) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -87,7 +87,11 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn update<'a>(&self, updates: HashMap<String, String>, context: &EventHandlerContext) {
async fn update<'a, S: Send + Sync + Clone>(
&self,
updates: HashMap<String, String>,
context: &EventHandlerContext<S>,
) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -99,7 +103,7 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn delete<'a>(&self, context: &EventHandlerContext) {
async fn delete<'a, S: Send + Sync + Clone>(&self, context: &EventHandlerContext<S>) {
let event = &context.event;
let client = context.get_raw_query_client();

Expand All @@ -111,18 +115,18 @@ pub trait ContractState:
StateView::refresh(&latest_state_version, table_name, client).await;
}

async fn read_one<'a>(
async fn read_one<'a, S: Send + Sync + Clone>(
filters: HashMap<String, String>,
context: &EventHandlerContext,
context: &EventHandlerContext<S>,
) -> Option<Self> {
let states = Self::read_many(filters, context).await;

states.first().cloned()
}

async fn read_many<'a>(
async fn read_many<'a, S: Send + Sync + Clone>(
filters: HashMap<String, String>,
context: &EventHandlerContext,
context: &EventHandlerContext<S>,
) -> Vec<Self> {
let client = context.get_raw_query_client();

Expand Down
32 changes: 17 additions & 15 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ impl ContractEvent {
type EventAbi = &'static str;

#[derive(Clone)]
pub struct Contract {
pub struct Contract<S: Send + Sync + Clone> {
pub addresses: Vec<UnsavedContractAddress>,
pub name: String,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler>>,
pub event_handlers: HashMap<EventAbi, Arc<dyn EventHandler<SharedState = S>>>,
pub state_migrations: Vec<Arc<dyn ContractStateMigrations>>,
}

impl Contract {
impl<S: Send + Sync + Clone> Contract<S> {
pub fn new(name: &str) -> Self {
Self {
addresses: vec![],
Expand All @@ -47,7 +47,7 @@ impl Contract {
}
}

pub fn add_address(&self, address: &str, chain: &Chain, start_block_number: i64) -> Self {
pub fn add_address(&mut self, address: &str, chain: &Chain, start_block_number: i64) -> Self {
let mut addresses = self.addresses.clone();

addresses.push(UnsavedContractAddress::new(
Expand All @@ -66,7 +66,7 @@ impl Contract {
pub fn add_event(
mut self,
event_abi: EventAbi,
event_handler: impl EventHandler + 'static,
event_handler: impl EventHandler<SharedState = S> + 'static,
) -> Self {
self.event_handlers.insert(event_abi, Arc::new(event_handler));

Expand Down Expand Up @@ -101,13 +101,15 @@ impl Contract {
pub struct Contracts;

impl Contracts {
pub fn get_state_migrations(contracts: &[Contract]) -> Vec<Arc<dyn ContractStateMigrations>> {
pub fn get_state_migrations<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> Vec<Arc<dyn ContractStateMigrations>> {
contracts.iter().flat_map(|c| c.state_migrations.clone()).collect()
}

pub fn get_all_event_handlers_by_event_abi(
contracts: &[Contract],
) -> HashMap<EventAbi, Arc<dyn EventHandler>> {
pub fn get_all_event_handlers_by_event_abi<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<EventAbi, Arc<dyn EventHandler<SharedState = S>>> {
contracts.iter().fold(
HashMap::new(),
|mut event_handlers_by_event_abi, contract| {
Expand All @@ -120,8 +122,8 @@ impl Contracts {
)
}

pub fn group_event_topics_by_names(
contracts: &[Contract],
pub fn group_event_topics_by_names<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<String, Vec<ContractEventTopic>> {
contracts.iter().fold(HashMap::new(), |mut topics_by_contract_name, contract| {
topics_by_contract_name.insert(contract.name.clone(), contract.get_event_topics());
Expand All @@ -130,8 +132,8 @@ impl Contracts {
})
}

pub fn group_events_by_topics(
contracts: &[Contract],
pub fn group_events_by_topics<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<ContractEventTopic, ContractEvent> {
contracts
.iter()
Expand All @@ -140,8 +142,8 @@ impl Contracts {
.collect()
}

pub fn get_all_contract_addresses_grouped_by_address(
contracts: &[Contract],
pub fn get_all_contract_addresses_grouped_by_address<S: Send + Sync + Clone>(
contracts: &[Contract<S>],
) -> HashMap<Address, &UnsavedContractAddress> {
contracts.iter().fold(HashMap::new(), |mut contracts_by_addresses, contract| {
contract.addresses.iter().for_each(
Expand Down
29 changes: 24 additions & 5 deletions chaindexing/src/event_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::{sync::Arc, time::Duration};

mod handle_events;
Expand All @@ -9,34 +10,51 @@ use crate::{contracts::Contracts, events::Event, ChaindexingRepo, Config, Repo};
use crate::{ChaindexingRepoRawQueryTxnClient, HasRawQueryClient};

#[derive(Clone)]
pub struct EventHandlerContext<'a> {
pub struct EventHandlerContext<'a, SharedState: Sync + Send + Clone> {
pub event: Event,
raw_query_client: &'a ChaindexingRepoRawQueryTxnClient<'a>,
shared_state: Option<Arc<Mutex<SharedState>>>,
}

impl<'a> EventHandlerContext<'a> {
pub fn new(event: Event, client: &'a ChaindexingRepoRawQueryTxnClient<'a>) -> Self {
impl<'a, SharedState: Sync + Send + Clone> EventHandlerContext<'a, SharedState> {
pub fn new(
event: Event,
client: &'a ChaindexingRepoRawQueryTxnClient<'a>,
shared_state: Option<Arc<Mutex<SharedState>>>,
) -> Self {
Self {
event,
raw_query_client: client,
shared_state,
}
}

pub async fn get_shared_state(&self) -> SharedState {
let shared_state = self.shared_state.clone().unwrap();
let shared_state = shared_state.lock().await;
shared_state.clone()
}

pub(super) fn get_raw_query_client(&self) -> &'a ChaindexingRepoRawQueryTxnClient<'a> {
self.raw_query_client
}
}

#[derive(Clone, Debug)]
pub struct NoSharedState;

#[async_trait::async_trait]
pub trait EventHandler: Send + Sync {
async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a>);
type SharedState: Send + Sync + Clone + Debug;

async fn handle_event<'a>(&self, event_context: EventHandlerContext<'a, Self::SharedState>);
}

// TODO: Use just raw query client through for mutations
pub struct EventHandlers;

impl EventHandlers {
pub fn start(config: &Config) {
pub fn start<S: Send + Sync + Clone + Debug + 'static>(config: &Config<S>) {
let config = config.clone();
tokio::spawn(async move {
let pool = config.repo.get_pool(1).await;
Expand All @@ -56,6 +74,7 @@ impl EventHandlers {
conn.clone(),
&event_handlers_by_event_abi,
&mut raw_query_client,
config.shared_state.clone(),
)
.await;

Expand Down
Loading

0 comments on commit 49e4da9

Please sign in to comment.