Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-extra-features): Fix PR comments #369

Merged
merged 10 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ impl MainNodeBuilder {
};

let secrets = try_load_config!(self.secrets.data_availability);
let l1 = try_load_config!(self.secrets.l1);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not l1_secrets instead of just l1?

match (da_client_config, secrets) {
(DAClientConfig::Avail(config), DataAvailabilitySecrets::Avail(secret)) => {
self.node.add_layer(AvailWiringLayer::new(config, secret));
Expand All @@ -535,7 +536,10 @@ impl MainNodeBuilder {
.add_layer(CelestiaWiringLayer::new(config, secret));
}

(DAClientConfig::Eigen(config), DataAvailabilitySecrets::Eigen(secret)) => {
(DAClientConfig::Eigen(mut config), DataAvailabilitySecrets::Eigen(secret)) => {
if config.eigenda_eth_rpc.is_none() {
config.eigenda_eth_rpc = Some(l1.l1_rpc_url.expose_str().to_string());
}
self.node.add_layer(EigenWiringLayer::new(config, secret));
}

Expand Down
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/da_client/eigen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct EigenConfig {
/// a value less or equal to 0 means that the disperser will not wait for finalization
pub settlement_layer_confirmation_depth: i32,
/// URL of the Ethereum RPC server
pub eigenda_eth_rpc: String,
pub eigenda_eth_rpc: Option<String>,
/// Address of the service manager contract
pub eigenda_svc_manager_address: String,
/// Wait for the blob to be finalized before returning the response
Expand Down
59 changes: 21 additions & 38 deletions core/lib/env_config/src/da_client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use std::env;

use zksync_config::{
configs::{
da_client::{
avail::{
AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME,
AVAIL_GAS_RELAY_CLIENT_NAME,
},
celestia::CelestiaSecrets,
eigen::EigenSecrets,
DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME,
EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME,
use zksync_config::configs::{
da_client::{
avail::{
AvailClientConfig, AvailSecrets, AVAIL_FULL_CLIENT_NAME, AVAIL_GAS_RELAY_CLIENT_NAME,
},
secrets::DataAvailabilitySecrets,
AvailConfig,
celestia::CelestiaSecrets,
eigen::EigenSecrets,
DAClientConfig, AVAIL_CLIENT_CONFIG_NAME, CELESTIA_CLIENT_CONFIG_NAME,
EIGEN_CLIENT_CONFIG_NAME, OBJECT_STORE_CLIENT_CONFIG_NAME,
},
EigenConfig,
secrets::DataAvailabilitySecrets,
AvailConfig,
};

use crate::{envy_load, FromEnv};
Expand All @@ -38,20 +34,7 @@ impl FromEnv for DAClientConfig {
},
}),
CELESTIA_CLIENT_CONFIG_NAME => Self::Celestia(envy_load("da_celestia_config", "DA_")?),
EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(EigenConfig {
disperser_rpc: env::var("EIGENDA_DISPERSER_RPC")?,
settlement_layer_confirmation_depth: env::var(
"EIGENDA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH",
)?
.parse()?,
eigenda_eth_rpc: env::var("EIGENDA_EIGENDA_ETH_RPC")?,
eigenda_svc_manager_address: env::var("EIGENDA_EIGENDA_SVC_MANAGER_ADDRESS")?,
wait_for_finalization: env::var("EIGENDA_WAIT_FOR_FINALIZATION")?.parse()?,
authenticated: env::var("EIGENDA_AUTHENTICATED")?.parse()?,
g1_url: env::var("EIGENDA_G1_URL")?.parse()?,
g2_url: env::var("EIGENDA_G2_URL")?.parse()?,
chain_id: env::var("EIGENDA_CHAIN_ID")?.parse()?,
}),
EIGEN_CLIENT_CONFIG_NAME => Self::Eigen(envy_load("da_eigen_config", "DA_")?),
OBJECT_STORE_CLIENT_CONFIG_NAME => {
Self::ObjectStore(envy_load("da_object_store", "DA_")?)
}
Expand Down Expand Up @@ -265,15 +248,15 @@ mod tests {
let mut lock = MUTEX.lock();
let config = r#"
DA_CLIENT="Eigen"
EIGENDA_DISPERSER_RPC="http://localhost:8080"
EIGENDA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH=0
EIGENDA_EIGENDA_ETH_RPC="http://localhost:8545"
EIGENDA_EIGENDA_SVC_MANAGER_ADDRESS="0x123"
EIGENDA_WAIT_FOR_FINALIZATION=true
EIGENDA_AUTHENTICATED=false
EIGENDA_G1_URL="resources1"
EIGENDA_G2_URL="resources2"
EIGENDA_CHAIN_ID=1
DA_DISPERSER_RPC="http://localhost:8080"
DA_SETTLEMENT_LAYER_CONFIRMATION_DEPTH=0
DA_EIGENDA_ETH_RPC="http://localhost:8545"
DA_EIGENDA_SVC_MANAGER_ADDRESS="0x123"
DA_WAIT_FOR_FINALIZATION=true
DA_AUTHENTICATED=false
DA_G1_URL="resources1"
DA_G2_URL="resources2"
DA_CHAIN_ID=1
"#;
lock.set_env(config);

Expand All @@ -283,7 +266,7 @@ mod tests {
DAClientConfig::Eigen(EigenConfig {
disperser_rpc: "http://localhost:8080".to_string(),
settlement_layer_confirmation_depth: 0,
eigenda_eth_rpc: "http://localhost:8545".to_string(),
eigenda_eth_rpc: Some("http://localhost:8545".to_string()),
eigenda_svc_manager_address: "0x123".to_string(),
wait_for_finalization: true,
authenticated: false,
Expand Down
9 changes: 5 additions & 4 deletions core/lib/protobuf_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ impl ProtoRepr for proto::DataAvailabilityClient {
&conf.settlement_layer_confirmation_depth,
)
.context("settlement_layer_confirmation_depth")?,
eigenda_eth_rpc: required(&conf.eigenda_eth_rpc)
.context("eigenda_eth_rpc")?
.clone(),
eigenda_eth_rpc: match required(&conf.eigenda_eth_rpc) {
Ok(rpc) => Some(rpc.clone()),
Err(_) => None,
},
eigenda_svc_manager_address: required(&conf.eigenda_svc_manager_address)
.context("eigenda_svc_manager_address")?
.clone(),
Expand Down Expand Up @@ -117,7 +118,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
settlement_layer_confirmation_depth: Some(
config.settlement_layer_confirmation_depth,
),
eigenda_eth_rpc: Some(config.eigenda_eth_rpc.clone()),
eigenda_eth_rpc: config.eigenda_eth_rpc.clone(),
eigenda_svc_manager_address: Some(config.eigenda_svc_manager_address.clone()),
wait_for_finalization: Some(config.wait_for_finalization),
authenticated: Some(config.authenticated),
Expand Down
3 changes: 3 additions & 0 deletions core/node/da_clients/src/eigen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ pub fn compile_protos() {

The generated folder is considered a temporary solution until the EigenDA has a library with either a protogen, or
preferably a full Rust client implementation.

proto files are not included here to not create confusion in case they are not updated in time, so the EigenDA
[repo](https://github.com/Layr-Labs/eigenda/tree/master/api/proto) has to be a source of truth for the proto files.
10 changes: 5 additions & 5 deletions core/node/da_clients/src/eigen/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: -1,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: false,
Expand Down Expand Up @@ -111,7 +111,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: -1,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: true,
Expand Down Expand Up @@ -155,7 +155,7 @@ mod tests {
g1_url: "https://github.com/Layr-Labs/eigenda-proxy/raw/2fd70b99ef5bf137d7bbca3461cf9e1f2c899451/resources/g1.point".to_string(),
g2_url: "https://github.com/Layr-Labs/eigenda-proxy/raw/2fd70b99ef5bf137d7bbca3461cf9e1f2c899451/resources/g2.point.powerOf2".to_string(),
settlement_layer_confirmation_depth: 0,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
chain_id: 17000,
};
Expand Down Expand Up @@ -191,7 +191,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: 5,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: false,
Expand Down Expand Up @@ -231,7 +231,7 @@ mod tests {
let config = EigenConfig {
disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(),
settlement_layer_confirmation_depth: 5,
eigenda_eth_rpc: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
eigenda_eth_rpc: Some("https://ethereum-holesky-rpc.publicnode.com".to_string()),
eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
wait_for_finalization: false,
authenticated: true,
Expand Down
42 changes: 31 additions & 11 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::eigen::{
disperser_client::DisperserClient,
AuthenticatedReply, BlobAuthHeader,
},
verifier::VerificationError,
};

#[derive(Debug, Clone)]
Expand All @@ -53,7 +54,10 @@ impl<T: GetBlobData> RawEigenClient<T> {
let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?));

let verifier_config = VerifierConfig {
rpc_url: config.eigenda_eth_rpc.clone(),
rpc_url: config
.eigenda_eth_rpc
.clone()
.ok_or(anyhow::anyhow!("EigenDA ETH RPC not set"))?,
svc_manager_addr: config.eigenda_svc_manager_address.clone(),
max_blob_size: Self::BLOB_SIZE_LIMIT as u32,
g1_url: config.g1_url.clone(),
Expand Down Expand Up @@ -109,7 +113,13 @@ impl<T: GetBlobData> RawEigenClient<T> {
.await?
.into_inner();

Ok(hex::encode(disperse_reply.request_id))
match disperser::BlobStatus::try_from(disperse_reply.result)? {
disperser::BlobStatus::Failed
| disperser::BlobStatus::InsufficientSignatures
| disperser::BlobStatus::Unknown => Err(anyhow::anyhow!("Blob dispatch failed")),

_ => Ok(hex::encode(disperse_reply.request_id)),
}
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
Expand Down Expand Up @@ -147,11 +157,18 @@ impl<T: GetBlobData> RawEigenClient<T> {
let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else {
return Err(anyhow::anyhow!("Unexpected response from server"));
};
Ok(hex::encode(disperse_reply.request_id))

match disperser::BlobStatus::try_from(disperse_reply.result)? {
disperser::BlobStatus::Failed
| disperser::BlobStatus::InsufficientSignatures
| disperser::BlobStatus::Unknown => Err(anyhow::anyhow!("Blob dispatch failed")),

_ => Ok(hex::encode(disperse_reply.request_id)),
}
}

pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<Option<BlobInfo>> {
let blob_info = self.try_get_inclusion_data(blob_id.to_string()).await?;
pub async fn get_commitment(&self, request_id: &str) -> anyhow::Result<Option<BlobInfo>> {
let blob_info = self.try_get_inclusion_data(request_id.to_string()).await?;

let Some(blob_info) = blob_info else {
return Ok(None);
Expand All @@ -162,7 +179,7 @@ impl<T: GetBlobData> RawEigenClient<T> {
let Some(data) = self.get_blob_data(blob_info.clone()).await? else {
return Err(anyhow::anyhow!("Failed to get blob data"));
};
let data_db = self.get_blob_data.call(blob_id).await?;
let data_db = self.get_blob_data.call(request_id).await?;
if let Some(data_db) = data_db {
if data_db != data {
return Err(anyhow::anyhow!(
Expand All @@ -179,16 +196,19 @@ impl<T: GetBlobData> RawEigenClient<T> {
.verify_inclusion_data_against_settlement_layer(blob_info.clone())
.await;
// in case of an error, the dispatcher will retry, so the need to return None
if result.is_err() {
return Ok(None);
if let Err(e) = result {
match e {
VerificationError::EmptyHash => return Ok(None),
_ => return Err(anyhow::anyhow!("Failed to verify inclusion data: {:?}", e)),
}
}

tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);
tracing::info!("Blob dispatch confirmed, request id: {}", request_id);
Ok(Some(blob_info))
}

pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>> {
let blob_info = self.get_commitment(blob_id).await?;
pub async fn get_inclusion_data(&self, request_id: &str) -> anyhow::Result<Option<Vec<u8>>> {
let blob_info = self.get_commitment(request_id).await?;
if let Some(blob_info) = blob_info {
Ok(Some(blob_info.blob_verification_proof.inclusion_proof))
} else {
Expand Down
13 changes: 8 additions & 5 deletions core/node/da_clients/src/eigen/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ impl Clone for Verifier {
impl Verifier {
pub const DEFAULT_PRIORITY_FEE_PER_GAS: u64 = 100;
pub const SRSORDER: u32 = 268435456; // 2 ^ 28
pub const G1POINT: &'static str = "g1.point";
pub const G2POINT: &'static str = "g2.point.powerOf2";
pub const POINT_SIZE: u32 = 32;

async fn save_point(url: String, point: String) -> Result<(), VerificationError> {
let url = Url::parse(&url).map_err(|_| VerificationError::LinkError)?;
Expand All @@ -120,21 +123,21 @@ impl Verifier {
Ok(())
}
async fn save_points(url_g1: String, url_g2: String) -> Result<String, VerificationError> {
Self::save_point(url_g1.clone(), "g1.point".to_string()).await?;
Self::save_point(url_g2.clone(), "g2.point.powerOf2".to_string()).await?;
Self::save_point(url_g1.clone(), Self::G1POINT.to_string()).await?;
Self::save_point(url_g2.clone(), Self::G2POINT.to_string()).await?;

Ok(".".to_string())
}
pub async fn new<T: VerifierClient + 'static>(
cfg: VerifierConfig,
signing_client: T,
) -> Result<Self, VerificationError> {
let srs_points_to_load = cfg.max_blob_size / 32;
let srs_points_to_load = cfg.max_blob_size / Self::POINT_SIZE;
let path = Self::save_points(cfg.clone().g1_url, cfg.clone().g2_url).await?;
let kzg = Kzg::setup(
&format!("{}{}", path, "/g1.point"),
&format!("{}/{}", path, Self::G1POINT),
"",
&format!("{}{}", path, "/g2.point.powerOf2"),
&format!("{}/{}", path, Self::G2POINT),
Self::SRSORDER,
srs_points_to_load,
"".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl WiringLayer for EigenWiringLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let master_pool = input.master_pool.get_custom(2).await?;
let master_pool = input.master_pool.get().await?;
let get_blob_from_db = GetBlobFromDB { pool: master_pool };
let client: Box<dyn DataAvailabilityClient> = Box::new(
EigenClient::new(self.config, self.secrets, Box::new(get_blob_from_db)).await?,
Expand All @@ -69,7 +69,7 @@ impl GetBlobData for GetBlobFromDB {
async fn call(&self, input: &'_ str) -> anyhow::Result<Option<Vec<u8>>> {
let pool = self.pool.clone();
let input = input.to_string();
let mut conn = pool.connection_tagged("da_dispatcher").await?;
let mut conn = pool.connection_tagged("eigen_client").await?;
let batch = conn
.data_availability_dal()
.get_blob_data_by_blob_id(&input)
Expand Down
Loading