Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics for DIPS experiments #4940

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ fn ethereum_call(
abis: &[Arc<MappingABI>],
eth_call_gas: Option<u32>,
) -> Result<AscEnumArray<EthereumValueKind>, HostExportError> {
ctx.gas.consume_host_fn(ETHEREUM_CALL)?;
ctx.gas
.consume_host_fn_with_metrics(ETHEREUM_CALL, "ethereum_call")?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a macro crate function_name! https://docs.rs/function_name/latest/function_name/

Which can be potentially used here instead of hardcoding then name. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nicest would be an enum for the gas ops, but you don't need to spend time refactoring that right now.


// For apiVersion >= 0.0.4 the call passed from the mapping includes the
// function signature; subgraphs using an apiVersion < 0.0.4 don't pass
Expand Down
5 changes: 3 additions & 2 deletions chain/near/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {

use graph::{
anyhow::anyhow,
components::metrics::gas::GasMetrics,
data::subgraph::API_VERSION_0_0_5,
prelude::{hex, BigInt},
runtime::{gas::GasCounter, DeterministicHostError, HostExportError},
Expand All @@ -175,7 +176,7 @@ mod tests {
let mut heap = BytesHeap::new(API_VERSION_0_0_5);
let trigger = NearTrigger::Block(Arc::new(block()));

let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::new(GasMetrics::mock()));
assert!(result.is_ok());
}

Expand All @@ -188,7 +189,7 @@ mod tests {
receipt: receipt().unwrap(),
}));

let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::new(GasMetrics::mock()));
assert!(result.is_ok());
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::BTreeSet;
use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data_source::causality_region::CausalityRegionSeq;
Expand Down Expand Up @@ -344,6 +345,8 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
self.metrics_registry.clone(),
);

let gas_metrics = GasMetrics::new(deployment.hash.clone(), self.metrics_registry.clone());

let unified_mapping_api_version = manifest.unified_mapping_api_version()?;
let triggers_adapter = chain.triggers_adapter(&deployment, &required_capabilities, unified_mapping_api_version).map_err(|e|
anyhow!(
Expand All @@ -355,6 +358,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
gas_metrics.clone(),
));

let subgraph_metrics = Arc::new(SubgraphInstanceMetrics::new(
Expand Down
64 changes: 64 additions & 0 deletions graph/src/components/metrics/gas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use super::MetricsRegistry;
use crate::prelude::DeploymentHash;
use prometheus::CounterVec;
use std::sync::Arc;

#[derive(Clone)]
pub struct GasMetrics {
pub gas_counter: CounterVec,
pub op_counter: CounterVec,
}

impl GasMetrics {
pub fn new(subgraph_id: DeploymentHash, registry: Arc<MetricsRegistry>) -> Self {
let gas_counter = registry
.global_deployment_counter_vec(
"deployment_gas",
"total gas used",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|err| {
panic!(
"Failed to register deployment_gas prometheus counter for {}: {}",
subgraph_id, err
)
});

let op_counter = registry
.global_deployment_counter_vec(
"deployment_op_count",
"total number of operations",
subgraph_id.as_str(),
&["method"],
)
.unwrap_or_else(|err| {
panic!(
"Failed to register deployment_op_count prometheus counter for {}: {}",
subgraph_id, err
)
});

GasMetrics {
gas_counter,
op_counter,
}
}

pub fn mock() -> Self {
let subgraph_id = DeploymentHash::default();
Self::new(subgraph_id, Arc::new(MetricsRegistry::mock()))
}

pub fn track_gas(&self, method: &str, gas_used: u64) {
self.gas_counter
.with_label_values(&[method])
.inc_by(gas_used as f64);
}

pub fn track_operations(&self, method: &str, op_count: u64) {
self.op_counter
.with_label_values(&[method])
.inc_by(op_count as f64);
}
}
2 changes: 2 additions & 0 deletions graph/src/components/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::collections::HashMap;
/// Metrics for measuring where time is spent during indexing.
pub mod stopwatch;

pub mod gas;

/// Create an unregistered counter with labels
pub fn counter_with_labels(
name: &str,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::Error;
use async_trait::async_trait;
use futures::sync::mpsc;

use crate::components::metrics::gas::GasMetrics;
use crate::components::store::SubgraphFork;
use crate::data_source::{
DataSource, DataSourceTemplate, MappingTrigger, TriggerData, TriggerWithHandler,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub trait RuntimeHost<C: Blockchain>: Send + Sync + 'static {
pub struct HostMetrics {
handler_execution_time: Box<HistogramVec>,
host_fn_execution_time: Box<HistogramVec>,
pub gas_metrics: GasMetrics,
pub stopwatch: StopwatchMetrics,
}

Expand All @@ -95,6 +97,7 @@ impl HostMetrics {
registry: Arc<MetricsRegistry>,
subgraph: &str,
stopwatch: StopwatchMetrics,
gas_metrics: GasMetrics,
) -> Self {
let handler_execution_time = registry
.new_deployment_histogram_vec(
Expand All @@ -118,6 +121,7 @@ impl HostMetrics {
handler_execution_time,
host_fn_execution_time,
stopwatch,
gas_metrics,
}
}

Expand Down
7 changes: 7 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ pub struct EnvVars {
pub subgraph_settings: Option<String>,
/// Whether to prefer substreams blocks streams over firehose when available.
pub prefer_substreams_block_streams: bool,
/// Set by the flag `GRAPH_ENABLE_GAS_METRICS`. Whether to enable
/// gas metrics. Off by default.
pub enable_gas_metrics: bool,
}

impl EnvVars {
Expand Down Expand Up @@ -236,6 +239,7 @@ impl EnvVars {
reorg_threshold: inner.reorg_threshold,
subgraph_settings: inner.subgraph_settings,
prefer_substreams_block_streams: inner.prefer_substreams_block_streams,
enable_gas_metrics: inner.enable_gas_metrics.0,
})
}

Expand Down Expand Up @@ -361,6 +365,9 @@ struct Inner {
default = "false"
)]
prefer_substreams_block_streams: bool,

#[envconfig(from = "GRAPH_ENABLE_GAS_METRICS", default = "false")]
enable_gas_metrics: EnvVarBoolean,
}

#[derive(Clone, Debug)]
Expand Down
47 changes: 39 additions & 8 deletions graph/src/runtime/gas/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod costs;
mod ops;
mod saturating;
mod size_of;
use crate::components::metrics::gas::GasMetrics;
use crate::prelude::{CheapClone, ENV_VARS};
use crate::runtime::DeterministicHostError;
pub use combinators::*;
Expand Down Expand Up @@ -75,22 +76,40 @@ impl Display for Gas {
}
}

#[derive(Clone, Default)]
pub struct GasCounter(Arc<AtomicU64>);
#[derive(Clone)]
pub struct GasCounter {
counter: Arc<AtomicU64>,
metrics: GasMetrics,
}

impl CheapClone for GasCounter {}

impl GasCounter {
/// Alias of [`Default::default`].
pub fn new() -> Self {
Self::default()
pub fn new(metrics: GasMetrics) -> Self {
Self {
counter: Arc::new(AtomicU64::new(0)),
metrics,
}
}

/// This should be called once per host export
pub fn consume_host_fn(&self, mut amount: Gas) -> Result<(), DeterministicHostError> {
pub fn consume_host_fn_inner(
&self,
mut amount: Gas,
method: Option<&str>,
) -> Result<(), DeterministicHostError> {
amount += costs::HOST_EXPORT_GAS;

// If gas metrics are enabled, track the gas used
if ENV_VARS.enable_gas_metrics {
if let Some(method) = method {
self.metrics.track_gas(method, amount.0);
self.metrics.track_operations(method, 1);
}
}

let old = self
.0
.counter
.fetch_update(SeqCst, SeqCst, |v| Some(v.saturating_add(amount.0)))
.unwrap();
let new = old.saturating_add(amount.0);
Expand All @@ -104,7 +123,19 @@ impl GasCounter {
}
}

pub fn consume_host_fn(&self, amount: Gas) -> Result<(), DeterministicHostError> {
self.consume_host_fn_inner(amount, Some("untracked"))
}

pub fn consume_host_fn_with_metrics(
&self,
amount: Gas,
method: &str,
) -> Result<(), DeterministicHostError> {
self.consume_host_fn_inner(amount, Some(method))
}

pub fn get(&self) -> Gas {
Gas(self.0.load(SeqCst))
Gas(self.counter.load(SeqCst))
}
}
11 changes: 9 additions & 2 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use graph::components::metrics::gas::GasMetrics;
use graph::data::store::scalar;
use graph::data::subgraph::*;
use graph::data::value::Word;
Expand Down Expand Up @@ -92,10 +93,14 @@ async fn test_valid_module_and_store_with_timeout(
"test",
metrics_registry.clone(),
);

let gas_metrics = GasMetrics::new(deployment_id.clone(), metrics_registry.clone());

let host_metrics = Arc::new(HostMetrics::new(
metrics_registry,
deployment_id.as_str(),
stopwatch_metrics,
gas_metrics,
));

let experimental_features = ExperimentalFeatures {
Expand Down Expand Up @@ -1243,14 +1248,16 @@ impl Host {
let ctx = mock_context(deployment.clone(), ds, store.subgraph_store(), version);
let host_exports = host_exports::test_support::HostExports::new(&ctx);

let metrics_registry = Arc::new(MetricsRegistry::mock());
let metrics_registry: Arc<MetricsRegistry> = Arc::new(MetricsRegistry::mock());
let stopwatch = StopwatchMetrics::new(
ctx.logger.clone(),
deployment.hash.clone(),
"test",
metrics_registry.clone(),
);
let gas = GasCounter::new();
let gas_metrics = GasMetrics::new(deployment.hash.clone(), metrics_registry);

let gas = GasCounter::new(gas_metrics);

Host {
ctx,
Expand Down
Loading