Skip to content

Commit 01a11e6

Browse files
prestwichdylanlott
authored andcommitted
fix: various improvements to bundler tasks (#68)
1 parent c4522ee commit 01a11e6

File tree

1 file changed

+35
-9
lines changed

1 file changed

+35
-9
lines changed

src/tasks/bundler.rs

+35-9
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use crate::tasks::oauth::Authenticator;
44
use oauth2::TokenResponse;
55
use reqwest::Url;
66
use serde::{Deserialize, Serialize};
7-
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
7+
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
88
use tokio::task::JoinHandle;
9+
use tokio::time;
10+
use tracing::{Instrument, debug, trace};
911
use zenith_types::ZenithEthBundle;
1012

1113
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
@@ -69,22 +71,46 @@ impl BundlePoller {
6971
Ok(resp.bundles)
7072
}
7173

72-
/// Spawns a task that sends bundles it finds to its channel sender.
73-
pub fn spawn(mut self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
74-
let (outbound, inbound) = unbounded_channel();
75-
let jh = tokio::spawn(async move {
76-
loop {
77-
if let Ok(bundles) = self.check_bundle_cache().await {
74+
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
75+
loop {
76+
let span = tracing::debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
77+
78+
// Enter the span for the next check.
79+
let _guard = span.enter();
80+
81+
// Check this here to avoid making the web request if we know
82+
// we don't need the results.
83+
if outbound.is_closed() {
84+
trace!("No receivers left, shutting down");
85+
break;
86+
}
87+
// exit the span after the check.
88+
drop(_guard);
89+
90+
match self.check_bundle_cache().instrument(span.clone()).await {
91+
Ok(bundles) => {
7892
tracing::debug!(count = ?bundles.len(), "found bundles");
7993
for bundle in bundles.into_iter() {
8094
if let Err(err) = outbound.send(bundle) {
8195
tracing::error!(err = ?err, "Failed to send bundle - channel is dropped");
8296
}
8397
}
8498
}
85-
tokio::time::sleep(tokio::time::Duration::from_millis(self.poll_interval_ms)).await;
99+
// If fetching was an error, we log and continue. We expect
100+
// these to be transient network issues.
101+
Err(e) => {
102+
debug!(error = %e, "Error fetching bundles");
103+
}
86104
}
87-
});
105+
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
106+
}
107+
}
108+
109+
/// Spawns a task that sends bundles it finds to its channel sender.
110+
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
111+
let (outbound, inbound) = unbounded_channel();
112+
113+
let jh = tokio::spawn(self.task_future(outbound));
88114

89115
(inbound, jh)
90116
}

0 commit comments

Comments
 (0)