1
- use std :: sync :: Arc ;
2
-
1
+ //! Transaction service responsible for fetching and sending trasnsactions to the simulator.
2
+ use crate :: config :: BuilderConfig ;
3
3
use alloy:: consensus:: TxEnvelope ;
4
4
use eyre:: Error ;
5
5
use reqwest:: { Client , Url } ;
6
6
use serde:: { Deserialize , Serialize } ;
7
7
use serde_json:: from_slice;
8
8
use tokio:: { sync:: mpsc, task:: JoinHandle } ;
9
9
10
- pub use crate :: config:: BuilderConfig ;
11
-
12
10
/// Models a response from the transaction pool.
13
11
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
14
12
pub struct TxPoolResponse {
@@ -21,40 +19,47 @@ pub struct TxPoolResponse {
21
19
pub struct TxPoller {
22
20
/// Config values from the Builder.
23
21
pub config : BuilderConfig ,
24
- /// Reqwest Client for fetching transactions from the tx-pool .
22
+ /// Reqwest Client for fetching transactions from the cache .
25
23
pub client : Client ,
24
+ /// Defines the interval at which the service should poll the cache.
25
+ pub poll_interval_ms : u64 ,
26
26
}
27
27
28
- /// TxPoller implements a poller task that fetches unique transactions from the transaction pool.
28
+ /// [` TxPoller`] implements a poller task that fetches unique transactions from the transaction pool.
29
29
impl TxPoller {
30
- /// Returns a new TxPoller with the given config.
30
+ /// Returns a new [`TxPoller`] with the given config.
31
+ /// * Defaults to 1000ms poll interval (1s).
31
32
pub fn new ( config : & BuilderConfig ) -> Self {
32
- Self { config : config. clone ( ) , client : Client :: new ( ) }
33
+ Self { config : config. clone ( ) , client : Client :: new ( ) , poll_interval_ms : 1000 }
34
+ }
35
+
36
+ /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
37
+ pub fn new_with_poll_interval_ms ( config : & BuilderConfig , poll_interval_ms : u64 ) -> Self {
38
+ Self { config : config. clone ( ) , client : Client :: new ( ) , poll_interval_ms }
33
39
}
34
40
35
- /// Polls the tx-pool for unique transactions and evicts expired transactions.
36
- /// unique transactions that haven't been seen before are sent into the builder pipeline.
41
+ /// Polls the transaction cache for transactions.
37
42
pub async fn check_tx_cache ( & mut self ) -> Result < Vec < TxEnvelope > , Error > {
38
43
let url: Url = Url :: parse ( & self . config . tx_pool_url ) ?. join ( "transactions" ) ?;
39
44
let result = self . client . get ( url) . send ( ) . await ?;
40
45
let response: TxPoolResponse = from_slice ( result. text ( ) . await ?. as_bytes ( ) ) ?;
41
46
Ok ( response. transactions )
42
47
}
43
48
44
- /// Spawns a task that trawls the cache for transactions and sends along anything it finds
45
- pub fn spawn ( mut self ) -> ( mpsc:: UnboundedReceiver < Arc < TxEnvelope > > , JoinHandle < ( ) > ) {
49
+ /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
50
+ pub fn spawn ( mut self ) -> ( mpsc:: UnboundedReceiver < TxEnvelope > , JoinHandle < ( ) > ) {
46
51
let ( outbound, inbound) = mpsc:: unbounded_channel ( ) ;
47
52
let jh = tokio:: spawn ( async move {
48
53
loop {
49
54
if let Ok ( transactions) = self . check_tx_cache ( ) . await {
50
55
tracing:: debug!( count = ?transactions. len( ) , "found transactions" ) ;
51
- for tx in transactions. iter ( ) {
52
- if let Err ( err) = outbound. send ( Arc :: new ( tx . clone ( ) ) ) {
53
- tracing:: error!( err = ?err, "failed to send transaction outbound " ) ;
56
+ for tx in transactions. into_iter ( ) {
57
+ if let Err ( err) = outbound. send ( tx ) {
58
+ tracing:: error!( err = ?err, "failed to send transaction - channel is dropped. " ) ;
54
59
}
55
60
}
56
61
}
57
- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
62
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
58
63
}
59
64
} ) ;
60
65
( inbound, jh)
0 commit comments