Skip to content

Commit da16b28

Browse files
committed
updates bundler to streaming actor pattern
1 parent 533d34e commit da16b28

File tree

2 files changed

+62
-49
lines changed

2 files changed

+62
-49
lines changed

src/tasks/block.rs

-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ impl BlockBuilder {
180180
error!(error = %e, "error polling bundles");
181181
}
182182
}
183-
self.bundle_poller.evict();
184183
}
185184

186185
/// Simulates a Zenith bundle against the rollup state

src/tasks/bundler.rs

+62-48
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,86 @@
11
//! Bundler service responsible for managing bundles.
2+
use std::sync::Arc;
3+
24
use super::oauth::Authenticator;
35

46
pub use crate::config::BuilderConfig;
57

8+
use alloy::consensus::TxEnvelope;
9+
use alloy::eips::eip2718::Encodable2718;
10+
use alloy::rpc::types::mev::EthSendBundle;
611
use oauth2::TokenResponse;
712
use reqwest::Url;
813
use serde::{Deserialize, Serialize};
9-
use std::collections::HashMap;
10-
use std::time::{Duration, Instant};
14+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
15+
use tokio::task::JoinHandle;
1116
use zenith_types::ZenithEthBundle;
1217

13-
/// A bundle response from the tx-pool endpoint, containing a UUID and a
14-
/// [`ZenithEthBundle`].
18+
/// Holds a Signet bundle from the cache that has a unique identifier
19+
/// and a Zenith bundle
1520
#[derive(Debug, Clone, Serialize, Deserialize)]
1621
pub struct Bundle {
17-
/// The bundle id (a UUID)
22+
/// Cache identifier for the bundle
1823
pub id: String,
19-
/// The bundle itself
24+
/// The Zenith bundle for this bundle
2025
pub bundle: ZenithEthBundle,
2126
}
2227

28+
impl PartialEq for Bundle {
29+
fn eq(&self, other: &Self) -> bool {
30+
self.id == other.id
31+
}
32+
}
33+
34+
impl Eq for Bundle {}
35+
36+
impl From<TxEnvelope> for Bundle {
37+
fn from(tx: TxEnvelope) -> Self {
38+
let tx_vec = vec![tx.encoded_2718().into()];
39+
Self {
40+
id: tx.tx_hash().to_string(),
41+
bundle: ZenithEthBundle {
42+
bundle: EthSendBundle {
43+
txs: tx_vec,
44+
reverting_tx_hashes: vec![*tx.tx_hash()],
45+
block_number: 0, // TODO: This needs to be set properly somewhere after into() is called
46+
min_timestamp: None,
47+
max_timestamp: None,
48+
replacement_uuid: None,
49+
},
50+
host_fills: None,
51+
},
52+
}
53+
}
54+
}
55+
2356
/// Response from the tx-pool containing a list of bundles.
2457
#[derive(Debug, Clone, Serialize, Deserialize)]
2558
pub struct TxPoolBundleResponse {
26-
/// the list of bundles
59+
/// Bundle responses are availabel on the bundles property
2760
pub bundles: Vec<Bundle>,
2861
}
2962

3063
/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
31-
#[derive(Debug)]
64+
#[derive(Debug, Clone)]
3265
pub struct BundlePoller {
33-
/// Configuration
66+
/// The builder configuration values.
3467
pub config: BuilderConfig,
35-
/// [`Authenticator`] for fetching OAuth tokens
68+
/// Authentication module that periodically fetches and stores auth tokens.
3669
pub authenticator: Authenticator,
37-
/// Already seen bundle UUIDs
38-
pub seen_uuids: HashMap<String, Instant>,
3970
}
4071

4172
/// Implements a poller for the block builder to pull bundles from the tx cache.
4273
impl BundlePoller {
4374
/// Creates a new BundlePoller from the provided builder config.
4475
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
45-
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
76+
Self { config: config.clone(), authenticator }
4677
}
4778

4879
/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
4980
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
50-
let mut unique: Vec<Bundle> = Vec::new();
51-
5281
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
5382
let token = self.authenticator.fetch_oauth_token().await?;
5483

55-
// Add the token to the request headers
5684
let result = reqwest::Client::new()
5785
.get(bundle_url)
5886
.bearer_auth(token.access_token().secret())
@@ -61,42 +89,28 @@ impl BundlePoller {
6189
.error_for_status()?;
6290

6391
let body = result.bytes().await?;
64-
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;
65-
66-
bundles.bundles.iter().for_each(|bundle| {
67-
self.check_seen_bundles(bundle.clone(), &mut unique);
68-
});
92+
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;
6993

70-
Ok(unique)
94+
Ok(resp.bundles)
7195
}
7296

73-
/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
74-
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
75-
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
76-
// add to the set of unique bundles
77-
unique.push(bundle.clone());
78-
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
79-
});
80-
}
81-
82-
/// Evicts expired bundles from the cache.
83-
pub fn evict(&mut self) {
84-
let expired_keys: Vec<String> = self
85-
.seen_uuids
86-
.iter()
87-
.filter_map(
88-
|(key, expiry)| {
89-
if expiry.elapsed().is_zero() {
90-
Some(key.clone())
91-
} else {
92-
None
97+
/// Spawns a task that simply sends out any bundles it ever finds
98+
pub fn spawn(mut self) -> (UnboundedReceiver<Arc<Bundle>>, JoinHandle<()>) {
99+
let (outbound, inbound) = unbounded_channel();
100+
let jh = tokio::spawn(async move {
101+
loop {
102+
if let Ok(bundles) = self.check_bundle_cache().await {
103+
tracing::debug!(count = ?bundles.len(), "found bundles");
104+
for bundle in bundles.iter() {
105+
if let Err(err) = outbound.send(Arc::new(bundle.clone())) {
106+
tracing::error!(err = ?err, "Failed to send bundle");
107+
}
93108
}
94-
},
95-
)
96-
.collect();
109+
}
110+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
111+
}
112+
});
97113

98-
for key in expired_keys {
99-
self.seen_uuids.remove(&key);
100-
}
114+
(inbound, jh)
101115
}
102116
}

0 commit comments

Comments
 (0)