Skip to content
This repository has been archived by the owner on Aug 2, 2024. It is now read-only.

add prometheus logs for da layer #1347

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- chore: added prometheus metrics for da layer
- chore: bump celestia rpc crate version
- fix(DA): run the proof first then the state update
- fix: `prove_current_block` is called after `update_state`
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/client/data-availability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ mp-digest-log = { workspace = true, default-features = true }
mp-hashers = { workspace = true, default-features = true }
mp-storage = { workspace = true, default-features = true }

# Prometheus
prometheus-endpoint = { workspace = true }

# Optional
clap = { workspace = true, features = ["derive"], optional = true }

Expand Down
10 changes: 10 additions & 0 deletions crates/client/data-availability/src/avail/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{anyhow, Result};
Expand All @@ -11,9 +12,14 @@ use avail_subxt::primitives::AvailExtrinsicParams;
use avail_subxt::{api as AvailApi, build_client, AvailConfig};
use ethers::types::{I256, U256};
use futures::lock::Mutex;
use futures::stream::iter;
use jsonrpsee::tracing::Instrument;
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;
use subxt::ext::sp_core::sr25519::Pair;
use subxt::OnlineClient;

use crate::da_metrics::DaMetrics;
use crate::utils::get_bytes_from_state_diff;
use crate::{DaClient, DaMode};

Expand Down Expand Up @@ -82,6 +88,10 @@ impl DaClient for AvailClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "avail".into()), ("app_id".into(), self.app_id.0.to_string())].iter().cloned().collect()
}
}

impl AvailClient {
Expand Down
9 changes: 9 additions & 0 deletions crates/client/data-availability/src/celestia/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod config;

use std::collections::HashMap;

use anyhow::Result;
use async_trait::async_trait;
use celestia_rpc::{BlobClient, HeaderClient};
Expand All @@ -8,8 +10,11 @@ use celestia_types::nmt::Namespace;
use celestia_types::{Blob, Result as CelestiaTypesResult};
use ethers::types::{I256, U256};
use jsonrpsee::http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder};
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;
use reqwest::header;

use crate::da_metrics::DaMetrics;
use crate::{DaClient, DaMode};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -46,6 +51,10 @@ impl DaClient for CelestiaClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "celesia".into())].iter().cloned().collect()
}
}

impl CelestiaClient {
Expand Down
29 changes: 29 additions & 0 deletions crates/client/data-availability/src/da_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use prometheus_endpoint::prometheus::core::AtomicU64;
use prometheus_endpoint::{register, Gauge, Histogram, HistogramOpts, Opts, PrometheusError, Registry};

#[derive(Clone, Debug)]
pub struct DaMetrics {
pub state_updates: Histogram,
pub state_proofs: Histogram,
}

impl DaMetrics {
pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
state_updates: register(
Histogram::with_opts(HistogramOpts::new(
"madara_da_state_updates",
"Histogram of time taken for state updates",
))?,
registry,
)?,
state_proofs: register(
Histogram::with_opts(HistogramOpts::new(
"madara_da_state_proofs",
"Histogram of time taken for state proofs",
))?,
registry,
)?,
})
}
}
8 changes: 8 additions & 0 deletions crates/client/data-availability/src/ethereum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
Expand All @@ -8,7 +9,10 @@ use ethers::prelude::{abigen, SignerMiddleware};
use ethers::providers::{Http, Provider};
use ethers::signers::{LocalWallet, Signer};
use ethers::types::{Address, I256, U256};
use prometheus_endpoint::prometheus::core::Metric;
use prometheus_endpoint::prometheus::proto::LabelPair;

use crate::da_metrics::DaMetrics;
use crate::utils::is_valid_http_endpoint;
use crate::{DaClient, DaMode};

Expand Down Expand Up @@ -59,6 +63,10 @@ impl DaClient for EthereumClient {
fn get_mode(&self) -> DaMode {
self.mode
}

fn get_da_metric_labels(&self) -> HashMap<String, String> {
[("name".into(), "ethereum".into())].iter().cloned().collect()
}
}

impl TryFrom<config::EthereumConfig> for EthereumClient {
Expand Down
41 changes: 41 additions & 0 deletions crates/client/data-availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ pub mod ethereum;
mod sharp;
pub mod utils;

mod da_metrics;

use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand All @@ -14,12 +18,16 @@ use futures::channel::mpsc;
use futures::StreamExt;
use mc_commitment_state_diff::BlockDAData;
use mp_hashers::HasherT;
use prometheus_endpoint::prometheus::core::AtomicU64;
use prometheus_endpoint::{register, Gauge, Opts, Registry as PrometheusRegistry};
use serde::{Deserialize, Serialize};
use sp_runtime::traits::Block as BlockT;
use starknet_api::block::BlockHash;
use starknet_api::state::ThinStateDiff;
use utils::state_diff_to_calldata;

use crate::da_metrics::DaMetrics;

pub struct DataAvailabilityWorker<B, H>(PhantomData<(B, H)>);

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -68,6 +76,7 @@ pub trait DaClient: Send + Sync {
fn get_mode(&self) -> DaMode;
async fn last_published_state(&self) -> Result<I256>;
async fn publish_state_diff(&self, state_diff: Vec<U256>) -> Result<()>;
fn get_da_metric_labels(&self) -> HashMap<String, String>;
}

/// The client worker for DA related tasks
Expand All @@ -83,29 +92,61 @@ where
{
pub async fn prove_current_block(
da_client: Arc<dyn DaClient + Send + Sync>,
prometheus: Option<PrometheusRegistry>,
mut state_diffs_rx: mpsc::Receiver<BlockDAData>,
madara_backend: Arc<mc_db::Backend<B>>,
) {
let da_metrics = prometheus.as_ref().and_then(|registry| DaMetrics::register(registry).ok());
if let Some(registry) = prometheus.as_ref() {
let gauge = Gauge::<AtomicU64>::with_opts(
Opts::new("madara_da_layer_info", "Information about the data availability layer used")
.const_labels(da_client.get_da_metric_labels()),
);
match gauge {
Ok(gauge) => match register(gauge, registry) {
Ok(_) => (),
Err(e) => {
log::error!("failed to register gauge for da layer info metrics: {e}");
}
},
Err(e) => {
log::error!("failed to create gauge for da layer info metrics: {e}");
}
}
}
while let Some(BlockDAData(starknet_block_hash, csd, num_addr_accessed)) = state_diffs_rx.next().await {
log::info!(
"received state diff for block {starknet_block_hash}: {csd:?}.{num_addr_accessed} addresses accessed."
);

let da_metrics = da_metrics.clone();
let da_client = da_client.clone();
let madara_backend = madara_backend.clone();
tokio::spawn(async move {
let prove_state_start = time::Instant::now();
match prove(da_client.get_mode(), starknet_block_hash, &csd, num_addr_accessed, madara_backend.clone())
.await
{
Err(err) => log::error!("proving error: {err}"),
Ok(()) => {}
}
let prove_state_end = time::Instant::now();

match update_state::<B, H>(madara_backend, da_client, starknet_block_hash, csd, num_addr_accessed).await
{
Err(err) => log::error!("state publishing error: {err}"),
Ok(()) => {}
};
let update_state_end = time::Instant::now();

if let Some(da_metrics) = da_metrics {
da_metrics
.state_proofs
.observe(prove_state_end.saturating_duration_since(prove_state_start).as_secs_f64());
da_metrics
.state_updates
.observe(update_state_end.saturating_duration_since(prove_state_end).as_secs_f64());
}
});
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ pub fn new_full(
Some(MADARA_TASK_GROUP),
DataAvailabilityWorker::<_, StarknetHasher>::prove_current_block(
da_client,
prometheus_registry.clone(),
commitment_state_diff_rx,
madara_backend.clone(),
),
Expand Down
Loading