Skip to content

Commit

Permalink
graph, runtime, chain: Add GasMetrics for DIPS experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Oct 26, 2023
1 parent 19fd41b commit d614c89
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 65 deletions.
3 changes: 2 additions & 1 deletion chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ fn ethereum_call(
abis: &[Arc<MappingABI>],
eth_call_gas: Option<u32>,
) -> Result<AscEnumArray<EthereumValueKind>, HostExportError> {
ctx.gas.consume_host_fn(ETHEREUM_CALL)?;
ctx.gas
.consume_host_fn_with_metrics(ETHEREUM_CALL, "ethereum_call")?;

// For apiVersion >= 0.0.4 the call passed from the mapping includes the
// function signature; subgraphs using an apiVersion < 0.0.4 don't pass
Expand Down
5 changes: 3 additions & 2 deletions chain/near/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {

use graph::{
anyhow::anyhow,
components::metrics::gas::GasMetrics,
data::subgraph::API_VERSION_0_0_5,
prelude::{hex, BigInt},
runtime::{gas::GasCounter, DeterministicHostError, HostExportError},
Expand All @@ -175,7 +176,7 @@ mod tests {
let mut heap = BytesHeap::new(API_VERSION_0_0_5);
let trigger = NearTrigger::Block(Arc::new(block()));

let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::new(GasMetrics::mock()));
assert!(result.is_ok());
}

Expand All @@ -188,7 +189,7 @@ mod tests {
receipt: receipt().unwrap(),
}));

let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::new(GasMetrics::mock()));
assert!(result.is_ok());
}

Expand Down
8 changes: 5 additions & 3 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::subgraph::loader::load_dynamic_data_sources;

use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::Blockchain;
use graph::blockchain::NodeCapabilities;
use graph::blockchain::{BlockchainKind, TriggerFilter};
use graph::blockchain::{Blockchain, BlockchainKind, NodeCapabilities, TriggerFilter};
use graph::components::metrics::gas::GasMetrics;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data_source::causality_region::CausalityRegionSeq;
Expand Down Expand Up @@ -372,6 +371,8 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
self.metrics_registry.clone(),
);

let gas_metrics = GasMetrics::new(deployment.hash.clone(), self.metrics_registry.clone());

let unified_mapping_api_version = manifest.unified_mapping_api_version()?;
let triggers_adapter = chain.triggers_adapter(&deployment, &required_capabilities, unified_mapping_api_version).map_err(|e|
anyhow!(
Expand All @@ -383,6 +384,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
gas_metrics.clone(),
));

let subgraph_metrics = Arc::new(SubgraphInstanceMetrics::new(
Expand Down
64 changes: 64 additions & 0 deletions graph/src/components/metrics/gas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use super::MetricsRegistry;
use crate::prelude::DeploymentHash;
use prometheus::CounterVec;
use std::sync::Arc;

#[derive(Clone)]
pub struct GasMetrics {
pub gas_counter: CounterVec,
pub op_counter: CounterVec,
}

impl GasMetrics {
pub fn new(subgraph_id: DeploymentHash, registry: Arc<MetricsRegistry>) -> Self {
let gas_counter = registry
.global_deployment_counter_vec(
"deployment_gas",
"total gas used",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|_| {
panic!(
"Failed to register deployment_gas prometheus counter for {}",
subgraph_id
)
});

let op_counter = registry
.global_deployment_counter_vec(
"deployment_op_count",
"total number of operations",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|_| {
panic!(
"Failed to register deployment_op_count prometheus counter for {}",
subgraph_id
)
});

GasMetrics {
gas_counter,
op_counter,
}
}

pub fn mock() -> Self {
let subgraph_id = DeploymentHash::default();
Self::new(subgraph_id, Arc::new(MetricsRegistry::mock()))
}

pub fn track_gas(&self, method: &str, gas_used: u64) {
self.gas_counter
.with_label_values(&[method])
.inc_by(gas_used as f64);
}

pub fn track_operations(&self, method: &str, op_count: u64) {
self.op_counter
.with_label_values(&[method])
.inc_by(op_count as f64);
}
}
2 changes: 2 additions & 0 deletions graph/src/components/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::collections::HashMap;
/// Metrics for measuring where time is spent during indexing.
pub mod stopwatch;

pub mod gas;

/// Create an unregistered counter with labels
pub fn counter_with_labels(
name: &str,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::Error;
use async_trait::async_trait;
use futures::sync::mpsc;

use crate::components::metrics::gas::GasMetrics;
use crate::components::store::SubgraphFork;
use crate::data_source::{
DataSource, DataSourceTemplate, MappingTrigger, TriggerData, TriggerWithHandler,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub trait RuntimeHost<C: Blockchain>: Send + Sync + 'static {
pub struct HostMetrics {
handler_execution_time: Box<HistogramVec>,
host_fn_execution_time: Box<HistogramVec>,
pub gas_metrics: GasMetrics,
pub stopwatch: StopwatchMetrics,
}

Expand All @@ -95,6 +97,7 @@ impl HostMetrics {
registry: Arc<MetricsRegistry>,
subgraph: &str,
stopwatch: StopwatchMetrics,
gas_metrics: GasMetrics,
) -> Self {
let handler_execution_time = registry
.new_deployment_histogram_vec(
Expand All @@ -118,6 +121,7 @@ impl HostMetrics {
handler_execution_time,
host_fn_execution_time,
stopwatch,
gas_metrics,
}
}

Expand Down
36 changes: 31 additions & 5 deletions graph/src/runtime/gas/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod costs;
mod ops;
mod saturating;
mod size_of;
use crate::components::metrics::gas::GasMetrics;
use crate::prelude::{CheapClone, ENV_VARS};
use crate::runtime::DeterministicHostError;
pub use combinators::*;
Expand Down Expand Up @@ -75,20 +76,33 @@ impl Display for Gas {
}
}

#[derive(Clone, Default)]
pub struct GasCounter(Arc<AtomicU64>);
#[derive(Clone)]
pub struct GasCounter(Arc<AtomicU64>, GasMetrics);

impl CheapClone for GasCounter {}

impl GasCounter {
/// Alias of [`Default::default`].
pub fn new() -> Self {
Self::default()
pub fn new(gas_metrics: GasMetrics) -> Self {
Self {
0: Arc::new(AtomicU64::new(0)),
1: gas_metrics,
}
}

/// This should be called once per host export
pub fn consume_host_fn(&self, mut amount: Gas) -> Result<(), DeterministicHostError> {
pub fn consume_host_fn_inner(
&self,
mut amount: Gas,
method: Option<&str>,
) -> Result<(), DeterministicHostError> {
amount += costs::HOST_EXPORT_GAS;

if let Some(method) = method {
self.1.track_gas(method, amount.0);
self.1.track_operations(method, 1);
}

let old = self
.0
.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_add(amount.0)))
Expand All @@ -104,6 +118,18 @@ impl GasCounter {
}
}

pub fn consume_host_fn(&self, amount: Gas) -> Result<(), DeterministicHostError> {
self.consume_host_fn_inner(amount, Some("untracked"))
}

pub fn consume_host_fn_with_metrics(
&self,
amount: Gas,
method: &str,
) -> Result<(), DeterministicHostError> {
self.consume_host_fn_inner(amount, Some(method))
}

pub fn get(&self) -> Gas {
Gas(self.0.load(SeqCst))
}
Expand Down
11 changes: 9 additions & 2 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use graph::components::metrics::gas::GasMetrics;
use graph::data::store::scalar;
use graph::data::subgraph::*;
use graph::data::value::Word;
Expand Down Expand Up @@ -92,10 +93,14 @@ async fn test_valid_module_and_store_with_timeout(
"test",
metrics_registry.clone(),
);

let gas_metrics = GasMetrics::new(deployment_id.clone(), metrics_registry.clone());

let host_metrics = Arc::new(HostMetrics::new(
metrics_registry,
deployment_id.as_str(),
stopwatch_metrics,
gas_metrics,
));

let experimental_features = ExperimentalFeatures {
Expand Down Expand Up @@ -1238,14 +1243,16 @@ impl Host {
let ctx = mock_context(deployment.clone(), ds, store.subgraph_store(), version);
let host_exports = host_exports::test_support::HostExports::new(&ctx);

let metrics_registry = Arc::new(MetricsRegistry::mock());
let metrics_registry: Arc<MetricsRegistry> = Arc::new(MetricsRegistry::mock());
let stopwatch = StopwatchMetrics::new(
ctx.logger.clone(),
deployment.hash.clone(),
"test",
metrics_registry.clone(),
);
let gas = GasCounter::new();
let gas_metrics = GasMetrics::new(deployment.hash.clone(), metrics_registry);

let gas = GasCounter::new(gas_metrics);

Host {
ctx,
Expand Down
Loading

0 comments on commit d614c89

Please sign in to comment.