From 6128230147461a4969317793d76621a806f66551 Mon Sep 17 00:00:00 2001 From: Luis Herasme Date: Thu, 25 Jul 2024 20:53:37 -0400 Subject: [PATCH] feat: load block handler config dynamically at runtime --- ghost-crab-common/src/config.rs | 4 ++-- ghost-crab-macros/src/lib.rs | 41 ++++----------------------------- ghost-crab/src/block_handler.rs | 19 ++++++--------- ghost-crab/src/indexer.rs | 40 +++++++++++++++++++++----------- 4 files changed, 40 insertions(+), 64 deletions(-) diff --git a/ghost-crab-common/src/config.rs b/ghost-crab-common/src/config.rs index 7c9f659..5102546 100644 --- a/ghost-crab-common/src/config.rs +++ b/ghost-crab-common/src/config.rs @@ -32,7 +32,7 @@ pub struct DataSource { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct BlockHandler { +pub struct BlockHandlerConfig { pub start_block: u64, pub network: String, pub execution_mode: Option, @@ -52,7 +52,7 @@ pub struct Config { pub data_sources: HashMap, pub templates: HashMap, pub networks: HashMap, - pub block_handlers: HashMap, + pub block_handlers: HashMap, } #[derive(Debug)] diff --git a/ghost-crab-macros/src/lib.rs b/ghost-crab-macros/src/lib.rs index 8ea03c6..ace44d8 100644 --- a/ghost-crab-macros/src/lib.rs +++ b/ghost-crab-macros/src/lib.rs @@ -25,21 +25,8 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream { } let config = config::load().unwrap(); - let source = config.block_handlers.get(name).expect("Source not found."); - - let network_config = - config.networks.get(&source.network).expect("RPC url not found for network"); - let rpc_url = Literal::string(&network_config.rpc_url); - let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second); - - let step = Literal::u64_suffixed(source.step); - let start_block = Literal::u64_suffixed(source.start_block); - let network = Literal::string(&source.network); - - let execution_mode = match source.execution_mode { - Some(ExecutionMode::Serial) => quote! { ExecutionMode::Serial }, - _ => quote! { ExecutionMode::Parallel }, - }; + let _ = config.block_handlers.get(name).expect("BlockHandler not found in the config.json"); + let name = Literal::string(name); let parsed = parse_macro_input!(input as ItemFn); let fn_name = parsed.sig.ident.clone(); @@ -61,28 +48,8 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream { #fn_body } - fn step(&self) -> u64 { - #step - } - - fn network(&self) -> String { - String::from(#network) - } - - fn rpc_url(&self) -> String { - String::from(#rpc_url) - } - - fn rate_limit(&self) -> u64 { - #requests_per_second - } - - fn start_block(&self) -> u64 { - #start_block - } - - fn execution_mode(&self) -> ExecutionMode { - #execution_mode + fn name() -> String { + String::from(#name) } } }) diff --git a/ghost-crab/src/block_handler.rs b/ghost-crab/src/block_handler.rs index 4f952b8..3235d7f 100644 --- a/ghost-crab/src/block_handler.rs +++ b/ghost-crab/src/block_handler.rs @@ -6,6 +6,7 @@ use alloy::rpc::types::eth::Block; use alloy::rpc::types::eth::BlockNumberOrTag; use alloy::transports::TransportError; use async_trait::async_trait; +use ghost_crab_common::config::BlockHandlerConfig; use ghost_crab_common::config::ExecutionMode; use std::sync::Arc; use std::time::Duration; @@ -27,28 +28,22 @@ pub type BlockHandlerInstance = Arc>; #[async_trait] pub trait BlockHandler { async fn handle(&self, params: BlockContext); - fn step(&self) -> u64; - fn network(&self) -> String; - fn rpc_url(&self) -> String; - fn start_block(&self) -> u64; - fn rate_limit(&self) -> u64; - fn execution_mode(&self) -> ExecutionMode; + fn name(&self) -> String; } pub struct ProcessBlocksInput { pub handler: BlockHandlerInstance, pub templates: TemplateManager, pub provider: CacheProvider, + pub config: BlockHandlerConfig, } pub async fn process_blocks( - ProcessBlocksInput { handler, templates, provider }: ProcessBlocksInput, + ProcessBlocksInput { handler, templates, provider, config }: ProcessBlocksInput, ) -> Result<(), TransportError> { - let step = handler.step(); - let start_block = handler.start_block(); - let execution_mode = handler.execution_mode(); + let execution_mode = config.execution_mode.unwrap_or(ExecutionMode::Parallel); - let mut current_block = start_block; + let mut current_block = config.start_block; let mut latest_block_manager = LatestBlockManager::new(provider.clone(), Duration::from_secs(10)); @@ -83,6 +78,6 @@ pub async fn process_blocks( } } - current_block += step; + current_block += config.step; } } diff --git a/ghost-crab/src/indexer.rs b/ghost-crab/src/indexer.rs index b4cadbe..653d42f 100644 --- a/ghost-crab/src/indexer.rs +++ b/ghost-crab/src/indexer.rs @@ -2,6 +2,7 @@ use crate::block_handler::{process_blocks, BlockHandlerInstance, ProcessBlocksIn use crate::cache::manager::RPCManager; use crate::event_handler::{process_events, EventHandlerInstance, ProcessEventsInput}; use alloy::primitives::Address; +use ghost_crab_common::config::{self, Config, ConfigError}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -28,19 +29,23 @@ pub struct Indexer { block_handlers: Vec, templates: TemplateManager, rpc_manager: RPCManager, + config: Config, } impl Indexer { - pub fn new() -> Indexer { + pub fn new() -> Result { let (tx, rx) = mpsc::channel::