Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bowen's Eigenda integration alt approach #2

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ monorepo/

# Environment Variables
.env

# kona-host data-dir
data/
32 changes: 29 additions & 3 deletions bin/client/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,42 @@ run-client-asterisc block_number l1_rpc l1_beacon_rpc l2_rpc rollup_node_rpc ver
--data-dir ./data \
{{verbosity}}

# Run the client program natively with the host program attached, against the op-devnet.
run-client-native-against-devnet verbosity='' block_number='' rollup_config_path='':
#!/usr/bin/env bash
L1_RPC="http://127.0.0.1:8545"
L1_BEACON_RPC="http://127.0.0.1:5052"
L2_RPC="http://127.0.0.1:9545"
ROLLUP_NODE_RPC="http://127.0.0.1:7545"
ROLLUP_CONFIG_PATH="../../../optimism/.devnet/rollup.json"

if [ -z "{{block_number}}" ]; then
BLOCK_NUMBER=$(cast block finalized --json --rpc-url $L2_RPC | jq -r .number | cast 2d)
else
BLOCK_NUMBER={{block_number}}
fi

just run-client-native $BLOCK_NUMBER \
$L1_RPC $L1_BEACON_RPC $L2_RPC $ROLLUP_NODE_RPC \
$ROLLUP_CONFIG_PATH {{verbosity}}

# Run the client program natively with the host program attached.
run-client-native block_number l1_rpc l1_beacon_rpc l2_rpc rollup_node_rpc verbosity='':
run-client-native block_number l1_rpc l1_beacon_rpc l2_rpc rollup_node_rpc rollup_config_path='' verbosity='':
#!/usr/bin/env bash
set -o errexit -o nounset -o pipefail

L1_NODE_ADDRESS="{{l1_rpc}}"
L1_BEACON_ADDRESS="{{l1_beacon_rpc}}"
L2_NODE_ADDRESS="{{l2_rpc}}"
OP_NODE_ADDRESS="{{rollup_node_rpc}}"

L2_CHAIN_ID=$(cast chain-id --rpc-url $L2_NODE_ADDRESS)
if [ -z "{{rollup_config_path}}" ]; then
CHAIN_ID_OR_ROLLUP_CONFIG_ARG="--l2-chain-id $L2_CHAIN_ID"
else
CHAIN_ID_OR_ROLLUP_CONFIG_ARG="--rollup-config-path $(realpath {{rollup_config_path}})"
fi

CLAIMED_L2_BLOCK_NUMBER={{block_number}}
echo "Fetching configuration for block #$CLAIMED_L2_BLOCK_NUMBER..."

Expand All @@ -82,7 +109,6 @@ run-client-native block_number l1_rpc l1_beacon_rpc l2_rpc rollup_node_rpc verbo
AGREED_L2_HEAD_HASH=$(cast block --rpc-url $L2_NODE_ADDRESS $((CLAIMED_L2_BLOCK_NUMBER - 1)) --json | jq -r .hash)
L1_ORIGIN_NUM=$(cast rpc --rpc-url $OP_NODE_ADDRESS "optimism_outputAtBlock" $(cast 2h $((CLAIMED_L2_BLOCK_NUMBER - 1))) | jq -r .blockRef.l1origin.number)
L1_HEAD=$(cast block --rpc-url $L1_NODE_ADDRESS $((L1_ORIGIN_NUM + 30)) --json | jq -r .hash)
L2_CHAIN_ID=$(cast chain-id --rpc-url $L2_NODE_ADDRESS)

# Move to the workspace root
cd $(git rev-parse --show-toplevel)
Expand All @@ -94,12 +120,12 @@ run-client-native block_number l1_rpc l1_beacon_rpc l2_rpc rollup_node_rpc verbo
--claimed-l2-output-root $CLAIMED_L2_OUTPUT_ROOT \
--agreed-l2-output-root $AGREED_L2_OUTPUT_ROOT \
--claimed-l2-block-number $CLAIMED_L2_BLOCK_NUMBER \
--l2-chain-id $L2_CHAIN_ID \
--l1-node-address $L1_NODE_ADDRESS \
--l1-beacon-address $L1_BEACON_ADDRESS \
--l2-node-address $L2_NODE_ADDRESS \
--native \
--data-dir ./data \
$CHAIN_ID_OR_ROLLUP_CONFIG_ARG \
{{verbosity}}

# Run the client program natively with the host program attached, in offline mode.
Expand Down
3 changes: 3 additions & 0 deletions bin/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use kona_proof::{
executor::KonaExecutor,
l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline},
l2::OracleL2ChainProvider,
altda::OracleEigenDAProvider,
sync::new_pipeline_cursor,
BootInfo, CachingOracle, HintType,
};
Expand Down Expand Up @@ -74,6 +75,7 @@ where
let mut l1_provider = OracleL1ChainProvider::new(boot.clone(), oracle.clone());
let mut l2_provider = OracleL2ChainProvider::new(boot.clone(), oracle.clone());
let beacon = OracleBlobProvider::new(oracle.clone());
let eigenda_blob_provider = OracleEigenDAProvider::new(oracle.clone());

// If the claimed L2 block number is less than the safe head of the L2 chain, the claim is
// invalid.
Expand Down Expand Up @@ -115,6 +117,7 @@ where
beacon,
l1_provider.clone(),
l2_provider.clone(),
eigenda_blob_provider.clone(),
);
let executor = KonaExecutor::new(&cfg, l2_provider.clone(), l2_provider, handle_register, None);
let mut driver = Driver::new(cursor, executor, pipeline);
Expand Down
14 changes: 12 additions & 2 deletions bin/host/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use crate::{
blobs::OnlineBlobProvider,
eigenda_blobs::OnlineEigenDABlobProvider,
kv::{
DiskKeyValueStore, LocalKeyValueStore, MemoryKeyValueStore, SharedKeyValueStore,
SplitKeyValueStore,
Expand Down Expand Up @@ -36,6 +37,8 @@ mode, the host runs the client program in a separate thread with the pre-image s
primary thread.
";

const EIGENDA_ADDRESS: &str = "127.0.0.1:31001";

/// The host binary CLI application arguments.
#[derive(Default, Parser, Serialize, Clone, Debug)]
#[command(about = ABOUT, version, styles = cli_styles())]
Expand Down Expand Up @@ -145,20 +148,27 @@ impl HostCli {
/// - A [ReqwestProvider] for the L2 node.
pub async fn create_providers(
&self,
) -> Result<(ReqwestProvider, OnlineBlobProvider, ReqwestProvider)> {
) -> Result<(ReqwestProvider, OnlineBlobProvider, OnlineEigenDABlobProvider, ReqwestProvider )> {
let blob_provider = OnlineBlobProvider::new_http(
self.l1_beacon_address.clone().ok_or(anyhow!("Beacon API URL must be set"))?,
)
.await
.map_err(|e| anyhow!("Failed to load blob provider configuration: {e}"))?;

let eigenda_blob_provider = OnlineEigenDABlobProvider::new_http(
//EIGENDA_ADDRESS.to_string(),
"http://127.0.0.1:3100".to_string(),
).await
.map_err(|e| anyhow!("Failed to load eigenda blob provider configuration: {e}"))?;

let l1_provider = Self::http_provider(
self.l1_node_address.as_ref().ok_or(anyhow!("Provider must be set"))?,
);
let l2_provider = Self::http_provider(
self.l2_node_address.as_ref().ok_or(anyhow!("L2 node address must be set"))?,
);

Ok((l1_provider, blob_provider, l2_provider))
Ok((l1_provider, blob_provider, eigenda_blob_provider, l2_provider))
}

/// Parses the CLI arguments and returns a new instance of a [SharedKeyValueStore], as it is
Expand Down
44 changes: 44 additions & 0 deletions bin/host/src/eigenda_blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use alloy_primitives::Bytes;
use anyhow::Ok;
use kona_derive::{errors::BlobProviderError, traits::BlobProvider};
use reqwest::{header::GetAll, Client};
use tracing::trace;

/// An online implementation of the [EigenDABlobProvider] trait.
#[derive(Debug, Clone)]
pub struct OnlineEigenDABlobProvider {
/// The base url.
base: String,
/// The inner reqwest client. Used to talk to proxy
inner: Client,
}

const GET_METHOD: &str = "get";

impl OnlineEigenDABlobProvider {
/// Creates a new instance of the [OnlineEigenDABlobProvider].
///
/// The `genesis_time` and `slot_interval` arguments are _optional_ and the
/// [OnlineEigenDABlobProvider] will attempt to load them dynamically at runtime if they are not
/// provided.
pub async fn new_http(base: String) -> Result<Self, anyhow::Error> {
let inner = Client::new();
Ok(Self { base, inner })
}

pub async fn fetch_eigenda_blob(
&self,
cert: &Bytes,
) -> Result<alloy_rlp::Bytes, reqwest::Error> {
let url = format!("{}/{}/{}", self.base, GET_METHOD, cert.slice(1..));

let raw_response = self.inner
.get(url)
.send()
.await?;

raw_response.bytes().await
}


}
31 changes: 28 additions & 3 deletions bin/host/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the [Fetcher] struct, which is responsible for fetching preimages from a
//! remote source.

use crate::{blobs::OnlineBlobProvider, kv::KeyValueStore};
use crate::{blobs::OnlineBlobProvider, eigenda_blobs::OnlineEigenDABlobProvider, kv::KeyValueStore};
use alloy_consensus::{Header, TxEnvelope, EMPTY_ROOT_HASH};
use alloy_eips::{
eip2718::Encodable2718,
Expand All @@ -22,7 +22,7 @@ use op_alloy_protocol::BlockInfo;
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::{error, trace, warn, info};

mod precompiles;

Expand All @@ -38,6 +38,8 @@ where
l1_provider: ReqwestProvider,
/// The blob provider
blob_provider: OnlineBlobProvider,
/// The eigenda provider
eigenda_blob_provider: OnlineEigenDABlobProvider,
/// L2 chain provider.
l2_provider: ReqwestProvider,
/// L2 head
Expand All @@ -55,10 +57,11 @@ where
kv_store: Arc<RwLock<KV>>,
l1_provider: ReqwestProvider,
blob_provider: OnlineBlobProvider,
eigenda_blob_provider: OnlineEigenDABlobProvider,
l2_provider: ReqwestProvider,
l2_head: B256,
) -> Self {
Self { kv_store, l1_provider, blob_provider, l2_provider, l2_head, last_hint: None }
Self { kv_store, l1_provider, blob_provider, eigenda_blob_provider, l2_provider, l2_head, last_hint: None }
}

/// Set the last hint to be received.
Expand Down Expand Up @@ -97,6 +100,7 @@ where

/// Fetch the preimage for the given hint and insert it into the key-value store.
async fn prefetch(&self, hint: &str) -> Result<()> {
trace!(target: "fetcher", "prefetch: {hint}");
let hint = Hint::parse(hint)?;
let (hint_type, hint_data) = hint.split();
trace!(target: "fetcher", "Fetching hint: {hint_type} {hint_data}");
Expand Down Expand Up @@ -541,6 +545,27 @@ where
kv_write_lock.set(key.into(), preimage.into())?;
}
}
HintType::AltDACommitment => {
let cert = hint_data;
info!(target: "fetcher", "Fetching AltDACommitment cert: {:?}", cert);
// Fetch the blob sidecar from the blob provider.
let eigenda_blob = self
.eigenda_blob_provider.
fetch_eigenda_blob(&cert).
await.
map_err(|e| anyhow!("Failed to fetch eigenda blob: {e}"))?;

info!(target: "fetcher", "eigenda_blob len {}", eigenda_blob.len());
// Acquire a lock on the key-value store and set the preimages.
let mut kv_write_lock = self.kv_store.write().await;

// Set the preimage for the blob commitment.
kv_write_lock.set(
PreimageKey::new(*keccak256(cert), PreimageKeyType::GlobalGeneric).into(),
eigenda_blob.to_vec(),
)?;

}
}

Ok(())
Expand Down
9 changes: 7 additions & 2 deletions bin/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod blobs;
pub mod eigenda_blobs;
pub mod cli;
pub use cli::{init_tracing_subscriber, HostCli};

Expand Down Expand Up @@ -34,11 +35,12 @@ pub async fn start_server(cfg: HostCli) -> Result<()> {
let hint_reader = HintReader::new(hint_chan);
let kv_store = cfg.construct_kv_store();
let fetcher = if !cfg.is_offline() {
let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?;
let (l1_provider, blob_provider, eigenda_blob_provider, l2_provider) = cfg.create_providers().await?;
Some(Arc::new(RwLock::new(Fetcher::new(
kv_store.clone(),
l1_provider,
blob_provider,
eigenda_blob_provider,
l2_provider,
cfg.agreed_l2_head_hash,
))))
Expand Down Expand Up @@ -69,18 +71,21 @@ pub async fn start_server_and_native_client(cfg: HostCli) -> Result<i32> {
let preimage_chan = BidirectionalChannel::new()?;
let kv_store = cfg.construct_kv_store();
let fetcher = if !cfg.is_offline() {
let (l1_provider, blob_provider, l2_provider) = cfg.create_providers().await?;
let (l1_provider, blob_provider, eigenda_blob_provider, l2_provider) = cfg.create_providers().await?;
Some(Arc::new(RwLock::new(Fetcher::new(
kv_store.clone(),
l1_provider,
blob_provider,
eigenda_blob_provider,
l2_provider,
cfg.agreed_l2_head_hash,
))))
} else {
None
};

info!(target: "host", "fetcher");

// Create the server and start it.
let server_task = task::spawn(start_native_preimage_server(
kv_store,
Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/attributes/stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ where
parent_beacon_root = Some(l1_header.parent_beacon_block_root.unwrap_or_default());
}

info!(target: "prepare_payload_attributes", "num tx {}", txs.len());

Ok(OpPayloadAttributes {
payload_attributes: PayloadAttributes {
timestamp: next_l2_time,
Expand Down
1 change: 1 addition & 0 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ where
PipelineErrorKind::Temporary(PipelineError::Eof) => {
trace!(target: "pipeline", "Pipeline advancing origin");
if let Err(e) = self.attributes.advance_origin().await {
warn!(target: "pipeline", "advance_origin something {:?}", e);
return StepResult::OriginAdvanceErr(e);
}
StepResult::AdvancedOrigin
Expand Down
68 changes: 68 additions & 0 deletions crates/derive/src/sources/eigenda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! Contains the [EigenDADataSource], which is a concrete implementation of the
//! [DataAvailabilityProvider] trait for the EigenDA protocol.
use crate::{
sources::{EigenDABlobSource, BlobSource, CalldataSource, EthereumDataSource},
traits::{EigenDABlobProvider, BlobProvider, ChainProvider, DataAvailabilityProvider},
types::PipelineResult,
};
use alloc::{boxed::Box, fmt::Debug};
use alloy_primitives::Bytes;
use async_trait::async_trait;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;

/// A factory for creating an Ethereum data source provider.
#[derive(Debug, Clone)]
pub struct EigenDADataSource<C, B, A>
where
C: ChainProvider + Send + Clone,
B: BlobProvider + Send + Clone,
A: EigenDABlobProvider + Send + Clone,
{
/// The blob source.
pub ethereum_source: EthereumDataSource<C, B>,
/// The eigenda source.
pub eigenda_source: EigenDABlobSource<A>,
}

impl<C, B, A> EigenDADataSource<C, B, A>
where
C: ChainProvider + Send + Clone + Debug,
B: BlobProvider + Send + Clone + Debug,
A: EigenDABlobProvider + Send + Clone + Debug,
{
/// Instantiates a new [EigenDADataSource].
pub const fn new(
ethereum_source: EthereumDataSource<C, B>,
eigenda_source: EigenDABlobSource<A>,
) -> Self {
Self { ethereum_source, eigenda_source }
}
}

#[async_trait]
impl<C, B, A> DataAvailabilityProvider for EigenDADataSource<C, B, A>
where
C: ChainProvider + Send + Sync + Clone + Debug,
B: BlobProvider + Send + Sync + Clone + Debug,
A: EigenDABlobProvider + Send + Sync + Clone + Debug,
{
type Item = Bytes;

async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult<Self::Item> {
// then acutally use ethereum da to fetch. items are Bytes
let item = self.ethereum_source.next(block_ref).await?;

// just dump all the data out
info!(target: "eth-datasource", "next item {:?}", item);

let eigenda_source_result = self.eigenda_source.next(&item).await;
info!(target: "eigenda-datasource", "eigenda_source_result {:?}", eigenda_source_result);
eigenda_source_result
}

fn clear(&mut self) {
self.eigenda_source.clear();
self.ethereum_source.clear();
}
}
Loading
Loading