Skip to content

Commit

Permalink
feat(eigen-client-extra-features): address PR comments (part 5) (#377)
Browse files Browse the repository at this point in the history
* use trait object

* prevent blocking non async code

* clippy suggestion
  • Loading branch information
juan518munoz authored Dec 19, 2024
1 parent 92d5ae6 commit 1747f97
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 27 deletions.
16 changes: 9 additions & 7 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@ use super::sdk::RawEigenClient;
use crate::utils::to_retriable_da_error;

#[async_trait]
pub trait GetBlobData: Clone + std::fmt::Debug + Send + Sync {
pub trait GetBlobData: std::fmt::Debug + Send + Sync {
async fn get_blob_data(&self, input: &str) -> anyhow::Result<Option<Vec<u8>>>;

fn clone_boxed(&self) -> Box<dyn GetBlobData>;
}

/// EigenClient is a client for the Eigen DA service.
#[derive(Debug, Clone)]
pub struct EigenClient<T: GetBlobData> {
pub(crate) client: Arc<RawEigenClient<T>>,
pub struct EigenClient {
pub(crate) client: Arc<RawEigenClient>,
}

impl<T: GetBlobData> EigenClient<T> {
impl EigenClient {
pub async fn new(
config: EigenConfig,
secrets: EigenSecrets,
get_blob_data: Box<T>,
get_blob_data: Box<dyn GetBlobData>,
) -> anyhow::Result<Self> {
let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str())
.map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?;
Expand All @@ -40,7 +42,7 @@ impl<T: GetBlobData> EigenClient<T> {
}

#[async_trait]
impl<T: GetBlobData + 'static> DataAvailabilityClient for EigenClient<T> {
impl DataAvailabilityClient for EigenClient {
async fn dispatch_blob(
&self,
_: u32, // batch number
Expand Down Expand Up @@ -75,6 +77,6 @@ impl<T: GetBlobData + 'static> DataAvailabilityClient for EigenClient<T> {
}

fn blob_size_limit(&self) -> Option<usize> {
Some(RawEigenClient::<T>::blob_size_limit())
Some(RawEigenClient::blob_size_limit())
}
}
10 changes: 7 additions & 3 deletions core/node/da_clients/src/eigen/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod tests {

use crate::eigen::{blob_info::BlobInfo, EigenClient, GetBlobData};

impl<T: GetBlobData> EigenClient<T> {
impl EigenClient {
pub async fn get_blob_data(
&self,
blob_id: BlobInfo,
Expand All @@ -32,8 +32,8 @@ mod tests {
const STATUS_QUERY_TIMEOUT: u64 = 1800000; // 30 minutes
const STATUS_QUERY_INTERVAL: u64 = 5; // 5 ms

async fn get_blob_info<T: GetBlobData>(
client: &EigenClient<T>,
async fn get_blob_info(
client: &EigenClient,
result: &DispatchResponse,
) -> anyhow::Result<BlobInfo> {
let blob_info = (|| async {
Expand Down Expand Up @@ -62,6 +62,10 @@ mod tests {
async fn get_blob_data(&self, _input: &'_ str) -> anyhow::Result<Option<Vec<u8>>> {
Ok(None)
}

fn clone_boxed(&self) -> Box<dyn GetBlobData> {
Box::new(self.clone())
}
}

#[ignore = "depends on external RPC"]
Expand Down
22 changes: 17 additions & 5 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,36 @@ use crate::eigen::{
verifier::VerificationError,
};

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient<T: GetBlobData> {
#[derive(Debug)]
pub(crate) struct RawEigenClient {
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: EigenConfig,
verifier: Verifier,
get_blob_data: Box<T>,
get_blob_data: Box<dyn GetBlobData>,
}

impl Clone for RawEigenClient {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
private_key: self.private_key,
config: self.config.clone(),
verifier: self.verifier.clone(),
get_blob_data: self.get_blob_data.clone_boxed(),
}
}
}

pub(crate) const DATA_CHUNK_SIZE: usize = 32;

impl<T: GetBlobData> RawEigenClient<T> {
impl RawEigenClient {
const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB

pub async fn new(
private_key: SecretKey,
config: EigenConfig,
get_blob_data: Box<T>,
get_blob_data: Box<dyn GetBlobData>,
) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
Expand Down
32 changes: 20 additions & 12 deletions core/node/da_clients/src/eigen/verifier.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::HashMap, fs::File, io::copy, path::Path};
use std::{collections::HashMap, path::Path};

use ark_bn254::{Fq, G1Affine};
use ethabi::{encode, ParamType, Token};
use rust_kzg_bn254::{blob::Blob, kzg::Kzg, polynomial::PolynomialFormat};
use tokio::{fs::File, io::AsyncWriteExt};
use url::Url;
use zksync_basic_types::web3::CallRequest;
use zksync_eth_client::{clients::PKSigningClient, EnrichedClientResult};
Expand Down Expand Up @@ -112,12 +113,14 @@ impl Verifier {
}
let path = format!("./{}", point);
let path = Path::new(&path);
let mut file = File::create(path).map_err(|_| VerificationError::LinkError)?;
let mut file = File::create(path).await.map_err(|_| VerificationError::LinkError)?;
let content = response
.bytes()
.await
.map_err(|_| VerificationError::LinkError)?;
copy(&mut content.as_ref(), &mut file).map_err(|_| VerificationError::LinkError)?;
file.write_all(&content)
.await
.map_err(|_| VerificationError::LinkError)?;
Ok(())
}
async fn save_points(url_g1: Url, url_g2: Url) -> Result<String, VerificationError> {
Expand All @@ -132,15 +135,20 @@ impl Verifier {
) -> Result<Self, VerificationError> {
let srs_points_to_load = cfg.max_blob_size / Self::POINT_SIZE;
let path = Self::save_points(cfg.clone().g1_url, cfg.clone().g2_url).await?;
let kzg = Kzg::setup(
&format!("{}/{}", path, Self::G1POINT),
"",
&format!("{}/{}", path, Self::G2POINT),
Self::SRSORDER,
srs_points_to_load,
"".to_string(),
);
let kzg = kzg.map_err(|e| {
let kzg_handle = tokio::task::spawn_blocking(move || {
Kzg::setup(
&format!("{}/{}", path, Self::G1POINT),
"",
&format!("{}/{}", path, Self::G2POINT),
Self::SRSORDER,
srs_points_to_load,
"".to_string(),
)
});
let kzg = kzg_handle.await.map_err(|e| {
tracing::error!("Failed to setup KZG: {:?}", e);
VerificationError::KzgError
})?.map_err(|e| {
tracing::error!("Failed to setup KZG: {:?}", e);
VerificationError::KzgError
})?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ impl GetBlobData for GetBlobFromDB {
.await?;
Ok(batch.map(|b| b.pubdata))
}

fn clone_boxed(&self) -> Box<dyn GetBlobData> {
Box::new(self.clone())
}
}

0 comments on commit 1747f97

Please sign in to comment.