From 4dfa1af8413ff0040cf5e87863956969d2c9c15b Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 3 Dec 2024 15:37:00 +0400 Subject: [PATCH] chain/ethereum: Handle subgraph datasource triggers also in `DecoderHook::after_decode` and refactor it --- chain/ethereum/src/data_source.rs | 207 +++++++++++++++++------------- 1 file changed, 119 insertions(+), 88 deletions(-) diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 071762a1c5b..a2da3e6cb4e 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -8,7 +8,7 @@ use graph::components::trigger_processor::RunnableTriggers; use graph::data_source::common::{ CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI, }; -use graph::data_source::CausalityRegion; +use graph::data_source::{CausalityRegion, MappingTrigger as MappingTriggerType}; use graph::env::ENV_VARS; use graph::futures03::future::try_join; use graph::futures03::stream::FuturesOrdered; @@ -1034,6 +1034,115 @@ impl DecoderHook { .collect(); Ok(labels) } + + fn collect_declared_calls<'a>( + &self, + runnables: &Vec>, + ) -> Vec<(Arc, DeclaredCall)> { + // Extract all hosted triggers from runnables + let all_triggers = runnables + .iter() + .flat_map(|runnable| &runnable.hosted_triggers); + + // Collect calls from both onchain and subgraph triggers + let mut all_calls = Vec::new(); + + for trigger in all_triggers { + let host_metrics = trigger.host.host_metrics(); + + match &trigger.mapping_trigger.trigger { + MappingTriggerType::Onchain(t) => { + if let MappingTrigger::Log { calls, .. } = t { + for call in calls.clone() { + all_calls.push((host_metrics.cheap_clone(), call)); + } + } + } + MappingTriggerType::Subgraph(t) => { + for call in t.calls.clone() { + // Convert subgraph call to the expected DeclaredCall type if needed + // or handle differently based on the types + all_calls.push((host_metrics.cheap_clone(), call)); + } + } + MappingTriggerType::Offchain(_) => {} + } + } + + all_calls + } + + /// Deduplicate calls. Unfortunately, we can't get `DeclaredCall` to + /// implement `Hash` or `Ord` easily, so we can only deduplicate by + /// comparing the whole call not with a `HashSet` or `BTreeSet`. + /// Since that can be inefficient, we don't deduplicate if we have an + /// enormous amount of calls; in that case though, things will likely + /// blow up because of the amount of I/O that many calls cause. + /// Cutting off at 1000 is fairly arbitrary + fn deduplicate_calls( + &self, + calls: Vec<(Arc, DeclaredCall)>, + ) -> Vec<(Arc, DeclaredCall)> { + if calls.len() >= 1000 { + return calls; + } + + let mut uniq_calls = Vec::new(); + for (metrics, call) in calls { + if !uniq_calls.iter().any(|(_, c)| c == &call) { + uniq_calls.push((metrics, call)); + } + } + uniq_calls + } + + /// Log information about failed eth calls. 'Failure' here simply + /// means that the call was reverted; outright errors lead to a real + /// error. For reverted calls, `self.eth_calls` returns the label + /// from the manifest for that call. + /// + /// One reason why declared calls can fail is if they are attached + /// to the wrong handler, or if arguments are specified incorrectly. + /// Calls that revert every once in a while might be ok and what the + /// user intended, but we want to clearly log so that users can spot + /// mistakes in their manifest, which will lead to unnecessary eth + /// calls + fn log_declared_call_results( + logger: &Logger, + failures: &[String], + calls_count: usize, + trigger_count: usize, + elapsed: Duration, + ) { + let fail_count = failures.len(); + + if fail_count > 0 { + let mut counts: Vec<_> = failures.iter().counts().into_iter().collect(); + counts.sort_by_key(|(label, _)| *label); + + let failure_summary = counts + .into_iter() + .map(|(label, count)| { + let times = if count == 1 { "time" } else { "times" }; + format!("{label} ({count} {times})") + }) + .join(", "); + + error!(logger, "Declared calls failed"; + "triggers" => trigger_count, + "calls_count" => calls_count, + "fail_count" => fail_count, + "calls_ms" => elapsed.as_millis(), + "failures" => format!("[{}]", failure_summary) + ); + } else { + debug!(logger, "Declared calls"; + "triggers" => trigger_count, + "calls_count" => calls_count, + "calls_ms" => elapsed.as_millis() + ); + } + } } #[async_trait] @@ -1045,50 +1154,6 @@ impl blockchain::DecoderHook for DecoderHook { runnables: Vec>, metrics: &Arc, ) -> Result>, MappingError> { - /// Log information about failed eth calls. 'Failure' here simply - /// means that the call was reverted; outright errors lead to a real - /// error. For reverted calls, `self.eth_calls` returns the label - /// from the manifest for that call. - /// - /// One reason why declared calls can fail is if they are attached - /// to the wrong handler, or if arguments are specified incorrectly. - /// Calls that revert every once in a while might be ok and what the - /// user intended, but we want to clearly log so that users can spot - /// mistakes in their manifest, which will lead to unnecessary eth - /// calls - fn log_results( - logger: &Logger, - failures: &[String], - calls_count: usize, - trigger_count: usize, - elapsed: Duration, - ) { - let fail_count = failures.len(); - - if fail_count > 0 { - let mut counts: Vec<_> = failures.iter().counts().into_iter().collect(); - counts.sort_by_key(|(label, _)| *label); - let counts = counts - .into_iter() - .map(|(label, count)| { - let times = if count == 1 { "time" } else { "times" }; - format!("{label} ({count} {times})") - }) - .join(", "); - error!(logger, "Declared calls failed"; - "triggers" => trigger_count, - "calls_count" => calls_count, - "fail_count" => fail_count, - "calls_ms" => elapsed.as_millis(), - "failures" => format!("[{}]", counts)); - } else { - debug!(logger, "Declared calls"; - "triggers" => trigger_count, - "calls_count" => calls_count, - "calls_ms" => elapsed.as_millis()); - } - } - if ENV_VARS.mappings.disable_declared_calls { return Ok(runnables); } @@ -1096,51 +1161,17 @@ impl blockchain::DecoderHook for DecoderHook { let _section = metrics.stopwatch.start_section("declared_ethereum_call"); let start = Instant::now(); - let calls: Vec<_> = runnables - .iter() - .map(|r| &r.hosted_triggers) - .flatten() - .filter_map(|trigger| { - trigger - .mapping_trigger - .trigger - .as_onchain() - .map(|t| (trigger.host.host_metrics(), t)) - }) - .filter_map(|(metrics, trigger)| match trigger { - MappingTrigger::Log { calls, .. } => Some( - calls - .clone() - .into_iter() - .map(move |call| (metrics.cheap_clone(), call)), - ), - MappingTrigger::Block { .. } | MappingTrigger::Call { .. } => None, - }) - .flatten() - .collect(); + // Collect and process declared calls + let calls = self.collect_declared_calls(&runnables); + let deduplicated_calls = self.deduplicate_calls(calls); - // Deduplicate calls. Unfortunately, we can't get `DeclaredCall` to - // implement `Hash` or `Ord` easily, so we can only deduplicate by - // comparing the whole call not with a `HashSet` or `BTreeSet`. - // Since that can be inefficient, we don't deduplicate if we have an - // enormous amount of calls; in that case though, things will likely - // blow up because of the amount of I/O that many calls cause. - // Cutting off at 1000 is fairly arbitrary - let calls = if calls.len() < 1000 { - let mut uniq_calls = Vec::new(); - for (metrics, call) in calls { - if !uniq_calls.iter().any(|(_, c)| c == &call) { - uniq_calls.push((metrics, call)); - } - } - uniq_calls - } else { - calls - }; + // Execute calls and log results + let calls_count = deduplicated_calls.len(); + let results = self + .eth_calls(logger, block_ptr, deduplicated_calls) + .await?; - let calls_count = calls.len(); - let results = self.eth_calls(logger, block_ptr, calls).await?; - log_results( + Self::log_declared_call_results( logger, &results, calls_count,