diff --git a/ghost-crab/src/block_handler.rs b/ghost-crab/src/block_handler.rs index 4de9626..a51d8d4 100644 --- a/ghost-crab/src/block_handler.rs +++ b/ghost-crab/src/block_handler.rs @@ -31,6 +31,7 @@ pub trait BlockHandler { fn name(&self) -> String; } +#[derive(Clone)] pub struct ProcessBlocksInput { pub handler: BlockHandlerInstance, pub templates: TemplateManager, diff --git a/ghost-crab/src/indexer/indexer.rs b/ghost-crab/src/indexer/indexer.rs index d4b9912..01f7d4e 100644 --- a/ghost-crab/src/indexer/indexer.rs +++ b/ghost-crab/src/indexer/indexer.rs @@ -1,5 +1,5 @@ use crate::block_handler::{process_blocks, BlockHandlerInstance, ProcessBlocksInput}; -use crate::cache::manager::RPCManager; +use crate::cache::manager::{CacheProvider, RPCManager}; use crate::event_handler::{process_events, EventHandlerInstance, ProcessEventsInput}; use ghost_crab_common::config::{self, Config, ConfigError}; @@ -43,20 +43,7 @@ impl Indexer { .remove(&handler.name()) .ok_or(AddHandlerError::NotFound(handler.name()))?; - let network = self - .config - .networks - .get(&event_config.network) - .ok_or(AddHandlerError::NetworkNotFound(event_config.network.clone()))?; - - let provider = self - .rpc_manager - .get_or_create( - event_config.network, - network.rpc_url.clone(), - network.requests_per_second, - ) - .await; + let provider = self.get_provider(&event_config.network).await?; self.handlers.push(ProcessEventsInput { start_block: event_config.start_block, @@ -81,33 +68,39 @@ impl Indexer { .remove(&handler.name()) .ok_or(AddHandlerError::NotFound(handler.name()))?; + let provider = self.get_provider(&block_config.network).await?; + + self.block_handlers.push(ProcessBlocksInput { + handler, + templates: self.templates.clone(), + provider, + config: block_config, + }); + + Ok(()) + } + + async fn get_provider(&mut self, network_name: &str) -> Result { let network = self .config .networks - .get(&block_config.network) - .ok_or(AddHandlerError::NetworkNotFound(block_config.network.clone()))?; + .get(network_name) + .ok_or(AddHandlerError::NetworkNotFound(network_name.to_string()))?; let provider = self .rpc_manager .get_or_create( - block_config.network.clone(), + network_name.to_string(), network.rpc_url.clone(), network.requests_per_second, ) .await; - self.block_handlers.push(ProcessBlocksInput { - handler, - templates: self.templates.clone(), - provider, - config: block_config, - }); - - Ok(()) + Ok(provider) } pub async fn start(mut self) -> Result<(), AddHandlerError> { - for block_handler in self.block_handlers { + for block_handler in self.block_handlers.clone() { tokio::spawn(async move { if let Err(error) = process_blocks(block_handler).await { println!("Error processing logs for block handler: {error}"); @@ -115,7 +108,7 @@ impl Indexer { }); } - for handler in self.handlers { + for handler in self.handlers.clone() { tokio::spawn(async move { if let Err(error) = process_events(handler).await { println!("Error processing logs for handler: {error}"); @@ -125,26 +118,14 @@ impl Indexer { // For dynamic sources (Templates) while let Some(template) = self.rx.recv().await { - let template_config = self + let config = self .config .templates .get(&template.handler.name()) .ok_or(AddHandlerError::NotFound(template.handler.name()))?; - let network = self - .config - .networks - .get(&template_config.network) - .ok_or(AddHandlerError::NetworkNotFound(template_config.network.clone()))?; - - let provider = self - .rpc_manager - .get_or_create( - template_config.network.clone(), - network.rpc_url.clone(), - network.requests_per_second, - ) - .await; + let execution_mode = config.execution_mode.unwrap_or(config::ExecutionMode::Parallel); + let provider = self.get_provider(&config.network.clone()).await?; let handler = ProcessEventsInput { start_block: template.start_block, @@ -153,9 +134,7 @@ impl Indexer { handler: template.handler, templates: self.templates.clone(), provider, - execution_mode: template_config - .execution_mode - .unwrap_or(config::ExecutionMode::Parallel), + execution_mode, }; tokio::spawn(async move {