Skip to content

Commit

Permalink
Get blob data from db
Browse files Browse the repository at this point in the history
  • Loading branch information
gianbelinche committed Dec 5, 2024
1 parent a9231f8 commit cd4dfc9
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 50 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/lib/dal/src/data_availability_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,43 @@ impl DataAvailabilityDal<'_, '_> {
})
.collect())
}

/// Fetches the pubdata for the L1 batch with a given blob id.
pub async fn get_blob_data_by_blob_id(
&mut self,
blob_id: &str,
) -> DalResult<Option<L1BatchDA>> {
let row = sqlx::query!(
r#"
SELECT
number,
pubdata_input
FROM
l1_batches
LEFT JOIN
data_availability
ON data_availability.l1_batch_number = l1_batches.number
WHERE
number != 0
AND data_availability.blob_id = $1
AND pubdata_input IS NOT NULL
ORDER BY
number
LIMIT
1
"#,
blob_id,
)
.instrument("get_blob_data_by_blob_id")
.with_arg("blob_id", &blob_id)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchDA {
// `unwrap` is safe here because we have a `WHERE` clause that filters out `NULL` values
pubdata: row.pubdata_input.unwrap(),
l1_batch_number: L1BatchNumber(row.number as u32),
});

Ok(row)
}
}
26 changes: 18 additions & 8 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::FromStr, sync::Arc};
use std::{future::Future, pin::Pin, str::FromStr, sync::Arc};

use async_trait::async_trait;
use secp256k1::SecretKey;
Expand All @@ -12,26 +12,36 @@ use zksync_da_client::{
use super::sdk::RawEigenClient;
use crate::utils::to_retriable_da_error;

type EigenFunctionReturn<'a> =
Pin<Box<dyn Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send + 'a>>;
pub trait EigenFunction: Clone + std::fmt::Debug + Send + Sync {
fn call(&self, input: &'_ str) -> EigenFunctionReturn;
}

/// EigenClient is a client for the Eigen DA service.
#[derive(Debug, Clone)]
pub struct EigenClient {
pub(crate) client: Arc<RawEigenClient>,
pub struct EigenClient<T: EigenFunction> {
pub(crate) client: Arc<RawEigenClient<T>>,
}

impl EigenClient {
pub async fn new(config: EigenConfig, secrets: EigenSecrets) -> anyhow::Result<Self> {
impl<T: EigenFunction> EigenClient<T> {
pub async fn new(
config: EigenConfig,
secrets: EigenSecrets,
function: Box<T>,
) -> anyhow::Result<Self> {
let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str())
.map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?;

let client = RawEigenClient::new(private_key, config).await?;
let client = RawEigenClient::new(private_key, config, function).await?;
Ok(Self {
client: Arc::new(client),
})
}
}

#[async_trait]
impl DataAvailabilityClient for EigenClient {
impl<T: EigenFunction + 'static> DataAvailabilityClient for EigenClient<T> {
async fn dispatch_blob(
&self,
_: u32, // batch number
Expand Down Expand Up @@ -66,6 +76,6 @@ impl DataAvailabilityClient for EigenClient {
}

fn blob_size_limit(&self) -> Option<usize> {
Some(RawEigenClient::blob_size_limit())
Some(RawEigenClient::<T>::blob_size_limit())
}
}
42 changes: 33 additions & 9 deletions core/node/da_clients/src/eigen/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ mod tests {
};
use zksync_types::secrets::PrivateKey;

use crate::eigen::{blob_info::BlobInfo, EigenClient};
use crate::eigen::{blob_info::BlobInfo, EigenClient, EigenFunction};

impl EigenClient {
impl<T: EigenFunction> EigenClient<T> {
pub async fn get_blob_data(
&self,
blob_id: BlobInfo,
Expand All @@ -35,8 +35,8 @@ mod tests {
const STATUS_QUERY_TIMEOUT: u64 = 1800000; // 30 minutes
const STATUS_QUERY_INTERVAL: u64 = 5; // 5 ms

async fn get_blob_info(
client: &EigenClient,
async fn get_blob_info<T: EigenFunction>(
client: &EigenClient<T>,
result: &DispatchResponse,
) -> anyhow::Result<BlobInfo> {
let blob_info = (|| async {
Expand All @@ -57,6 +57,20 @@ mod tests {
Ok(blob_info)
}

#[derive(Debug, Clone)]
struct MockEigenFunction;

impl EigenFunction for MockEigenFunction {
fn call(
&self,
_input: &'_ str,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send + '_>,
> {
Box::pin(async { Ok(None) })
}
}

#[ignore = "depends on external RPC"]
#[tokio::test]
#[serial]
Expand All @@ -77,7 +91,9 @@ mod tests {
)
.unwrap(),
};
let client = EigenClient::new(config.clone(), secrets).await.unwrap();
let client = EigenClient::new(config.clone(), secrets, Box::new(MockEigenFunction))
.await
.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

Expand Down Expand Up @@ -114,7 +130,9 @@ mod tests {
)
.unwrap(),
};
let client = EigenClient::new(config.clone(), secrets).await.unwrap();
let client = EigenClient::new(config.clone(), secrets, Box::new(MockEigenFunction))
.await
.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info = get_blob_info(&client, &result).await.unwrap();
Expand Down Expand Up @@ -151,7 +169,9 @@ mod tests {
)
.unwrap(),
};
let client = EigenClient::new(config.clone(), secrets).await.unwrap();
let client = EigenClient::new(config.clone(), secrets, Box::new(MockEigenFunction))
.await
.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info = get_blob_info(&client, &result).await.unwrap();
Expand Down Expand Up @@ -188,7 +208,9 @@ mod tests {
)
.unwrap(),
};
let client = EigenClient::new(config.clone(), secrets).await.unwrap();
let client = EigenClient::new(config.clone(), secrets, Box::new(MockEigenFunction))
.await
.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info = get_blob_info(&client, &result).await.unwrap();
Expand Down Expand Up @@ -225,7 +247,9 @@ mod tests {
)
.unwrap(),
};
let client = EigenClient::new(config.clone(), secrets).await.unwrap();
let client = EigenClient::new(config.clone(), secrets, Box::new(MockEigenFunction))
.await
.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info = get_blob_info(&client, &result).await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions core/node/da_clients/src/eigen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ mod sdk;
mod verifier;
mod verifier_tests;

pub use self::client::EigenClient;

pub use self::client::{EigenClient, EigenFunction};
#[allow(clippy::all)]
pub(crate) mod disperser {
include!("generated/disperser.rs");
Expand Down
64 changes: 39 additions & 25 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::{
blob_info::BlobInfo,
disperser::BlobInfo as DisperserBlobInfo,
verifier::{Verifier, VerifierConfig},
EigenFunction,
};
use crate::eigen::{
blob_info,
Expand All @@ -29,19 +30,24 @@ use crate::eigen::{
};

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
pub(crate) struct RawEigenClient<T: EigenFunction> {
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: EigenConfig,
verifier: Verifier,
function: Box<T>,
}

pub(crate) const DATA_CHUNK_SIZE: usize = 32;

impl RawEigenClient {
impl<T: EigenFunction> RawEigenClient<T> {
const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB

pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result<Self> {
pub async fn new(
private_key: SecretKey,
config: EigenConfig,
function: Box<T>,
) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?));
Expand Down Expand Up @@ -78,6 +84,7 @@ impl RawEigenClient {
private_key,
config,
verifier,
function,
})
}

Expand Down Expand Up @@ -145,31 +152,38 @@ impl RawEigenClient {
pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<Option<BlobInfo>> {
let blob_info = self.try_get_inclusion_data(blob_id.to_string()).await?;

if let Some(blob_info) = blob_info {
let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
let Some(blob_info) = blob_info else {
return Ok(None);
};
let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;

let data = self.get_blob_data(blob_info.clone()).await?;
if data.is_none() {
return Err(anyhow::anyhow!("Failed to get blob data"));
}
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data.unwrap())
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

let result = self
.verifier
.verify_inclusion_data_against_settlement_layer(blob_info.clone())
.await;
if result.is_err() {
return Ok(None);
let Some(data) = self.get_blob_data(blob_info.clone()).await? else {
return Err(anyhow::anyhow!("Failed to get blob data"));
};
let data_db = self.function.call(blob_id).await?;
if let Some(data_db) = data_db {
if data_db != data {
return Err(anyhow::anyhow!(
"Data from db and from disperser are different"
));
}

tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);
Ok(Some(blob_info))
} else {
Ok(None)
}
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

let result = self
.verifier
.verify_inclusion_data_against_settlement_layer(blob_info.clone())
.await;
// in case of an error, the dispatcher will retry, so the need to return None
if result.is_err() {
return Ok(None);
}

tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);
Ok(Some(blob_info))
}

pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>> {
Expand Down
Loading

0 comments on commit cd4dfc9

Please sign in to comment.