Skip to content

Commit 0bb1798

Browse files
committed
updates bundler to streaming actor pattern
1 parent aaeb7c6 commit 0bb1798

File tree

2 files changed

+39
-49
lines changed

2 files changed

+39
-49
lines changed

src/tasks/block.rs

-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ impl BlockBuilder {
180180
error!(error = %e, "error polling bundles");
181181
}
182182
}
183-
self.bundle_poller.evict();
184183
}
185184

186185
/// Simulates a Zenith bundle against the rollup state

src/tasks/bundler.rs

+39-48
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,63 @@
11
//! Bundler service responsible for managing bundles.
2+
use std::sync::Arc;
3+
24
use super::oauth::Authenticator;
35

46
pub use crate::config::BuilderConfig;
57

68
use oauth2::TokenResponse;
79
use reqwest::Url;
810
use serde::{Deserialize, Serialize};
9-
use std::collections::HashMap;
10-
use std::time::{Duration, Instant};
11+
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
12+
use tokio::task::JoinHandle;
1113
use zenith_types::ZenithEthBundle;
1214

13-
/// A bundle response from the tx-pool endpoint, containing a UUID and a
14-
/// [`ZenithEthBundle`].
15+
/// Holds a Signet bundle from the cache that has a unique identifier
16+
/// and a Zenith bundle
1517
#[derive(Debug, Clone, Serialize, Deserialize)]
1618
pub struct Bundle {
17-
/// The bundle id (a UUID)
19+
/// Cache identifier for the bundle
1820
pub id: String,
19-
/// The bundle itself
21+
/// The Zenith bundle for this bundle
2022
pub bundle: ZenithEthBundle,
2123
}
2224

25+
impl PartialEq for Bundle {
26+
fn eq(&self, other: &Self) -> bool {
27+
self.id == other.id
28+
}
29+
}
30+
31+
impl Eq for Bundle {}
32+
2333
/// Response from the tx-pool containing a list of bundles.
2434
#[derive(Debug, Clone, Serialize, Deserialize)]
2535
pub struct TxPoolBundleResponse {
26-
/// the list of bundles
36+
/// Bundle responses are availabel on the bundles property
2737
pub bundles: Vec<Bundle>,
2838
}
2939

3040
/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
31-
#[derive(Debug)]
41+
#[derive(Debug, Clone)]
3242
pub struct BundlePoller {
33-
/// Configuration
43+
/// The builder configuration values.
3444
pub config: BuilderConfig,
35-
/// [`Authenticator`] for fetching OAuth tokens
45+
/// Authentication module that periodically fetches and stores auth tokens.
3646
pub authenticator: Authenticator,
37-
/// Already seen bundle UUIDs
38-
pub seen_uuids: HashMap<String, Instant>,
3947
}
4048

4149
/// Implements a poller for the block builder to pull bundles from the tx cache.
4250
impl BundlePoller {
4351
/// Creates a new BundlePoller from the provided builder config.
4452
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
45-
Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() }
53+
Self { config: config.clone(), authenticator }
4654
}
4755

4856
/// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
4957
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
50-
let mut unique: Vec<Bundle> = Vec::new();
51-
5258
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
5359
let token = self.authenticator.fetch_oauth_token().await?;
5460

55-
// Add the token to the request headers
5661
let result = reqwest::Client::new()
5762
.get(bundle_url)
5863
.bearer_auth(token.access_token().secret())
@@ -61,42 +66,28 @@ impl BundlePoller {
6166
.error_for_status()?;
6267

6368
let body = result.bytes().await?;
64-
let bundles: TxPoolBundleResponse = serde_json::from_slice(&body)?;
69+
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;
6570

66-
bundles.bundles.iter().for_each(|bundle| {
67-
self.check_seen_bundles(bundle.clone(), &mut unique);
68-
});
69-
70-
Ok(unique)
71-
}
72-
73-
/// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
74-
fn check_seen_bundles(&mut self, bundle: Bundle, unique: &mut Vec<Bundle>) {
75-
self.seen_uuids.entry(bundle.id.clone()).or_insert_with(|| {
76-
// add to the set of unique bundles
77-
unique.push(bundle.clone());
78-
Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration)
79-
});
71+
Ok(resp.bundles)
8072
}
8173

82-
/// Evicts expired bundles from the cache.
83-
pub fn evict(&mut self) {
84-
let expired_keys: Vec<String> = self
85-
.seen_uuids
86-
.iter()
87-
.filter_map(
88-
|(key, expiry)| {
89-
if expiry.elapsed().is_zero() {
90-
Some(key.clone())
91-
} else {
92-
None
74+
/// Spawns a task that simply sends out any bundles it ever finds
75+
pub fn spawn(mut self) -> (UnboundedReceiver<Arc<Bundle>>, JoinHandle<()>) {
76+
let (outbound, inbound) = unbounded_channel();
77+
let jh = tokio::spawn(async move {
78+
loop {
79+
if let Ok(bundles) = self.check_bundle_cache().await {
80+
tracing::debug!(count = ?bundles.len(), "found bundles");
81+
for bundle in bundles.iter() {
82+
if let Err(err) = outbound.send(Arc::new(bundle.clone())) {
83+
tracing::error!(err = ?err, "Failed to send bundle");
84+
}
9385
}
94-
},
95-
)
96-
.collect();
86+
}
87+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
88+
}
89+
});
9790

98-
for key in expired_keys {
99-
self.seen_uuids.remove(&key);
100-
}
91+
(inbound, jh)
10192
}
10293
}

0 commit comments

Comments
 (0)