@@ -230,15 +230,11 @@ where A::Target: chain::Access, L::Target: Logger {
230
230
}
231
231
232
232
macro_rules! define_run_body {
233
- ( $persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
233
+ ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
234
+ $channel_manager: ident, $process_channel_manager_events: expr,
234
235
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
235
236
$loop_exit_check: expr, $await: expr)
236
237
=> { {
237
- let event_handler = DecoratingEventHandler {
238
- event_handler: $event_handler,
239
- gossip_sync: & $gossip_sync,
240
- } ;
241
-
242
238
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
243
239
$channel_manager. timer_tick_occurred( ) ;
244
240
@@ -249,8 +245,8 @@ macro_rules! define_run_body {
249
245
let mut have_pruned = false ;
250
246
251
247
loop {
252
- $channel_manager . process_pending_events ( & event_handler ) ;
253
- $chain_monitor . process_pending_events ( & event_handler ) ;
248
+ $process_channel_manager_events ;
249
+ $process_chain_monitor_events ;
254
250
255
251
// Note that the PeerManager::process_events may block on ChannelManager's locks,
256
252
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -383,7 +379,8 @@ pub async fn process_events_async<
383
379
CMH : ' static + Deref + Send + Sync ,
384
380
RMH : ' static + Deref + Send + Sync ,
385
381
OMH : ' static + Deref + Send + Sync ,
386
- EH : ' static + EventHandler + Send ,
382
+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
383
+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
387
384
PS : ' static + Deref + Send ,
388
385
M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
389
386
CM : ' static + Deref < Target = ChannelManager < CW , T , K , F , L > > + Send + Sync ,
@@ -396,7 +393,7 @@ pub async fn process_events_async<
396
393
SleepFuture : core:: future:: Future < Output = bool > ,
397
394
Sleeper : Fn ( Duration ) -> SleepFuture
398
395
> (
399
- persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
396
+ persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
400
397
gossip_sync : GossipSync < PGS , RGS , G , CA , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
401
398
sleeper : Sleeper ,
402
399
) -> Result < ( ) , std:: io:: Error >
@@ -416,7 +413,23 @@ where
416
413
PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
417
414
{
418
415
let mut should_continue = true ;
419
- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
416
+ let async_event_handler = |event| -> core:: pin:: Pin < Box < dyn core:: future:: Future < Output = ( ) > > > {
417
+ let network_graph = gossip_sync. network_graph ( ) ;
418
+ let event_handler = & event_handler;
419
+ Box :: pin ( async move {
420
+ if let Some ( network_graph) = network_graph {
421
+ if let Event :: PaymentPathFailed { ref network_update, .. } = event {
422
+ if let Some ( network_update) = network_update {
423
+ network_graph. handle_network_update ( & network_update) ;
424
+ }
425
+ }
426
+ }
427
+ event_handler ( event) . await ;
428
+ } )
429
+ } ;
430
+ define_run_body ! ( persister,
431
+ chain_monitor, chain_monitor. process_pending_events_async( async_event_handler) . await ,
432
+ channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
420
433
gossip_sync, peer_manager, logger, scorer, should_continue, {
421
434
select_biased! {
422
435
_ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
@@ -521,7 +534,12 @@ impl BackgroundProcessor {
521
534
let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
522
535
let stop_thread_clone = stop_thread. clone ( ) ;
523
536
let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
524
- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
537
+ let event_handler = DecoratingEventHandler {
538
+ event_handler,
539
+ gossip_sync : & gossip_sync,
540
+ } ;
541
+ define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
542
+ channel_manager, channel_manager. process_pending_events( & event_handler) ,
525
543
gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
526
544
channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) )
527
545
} ) ;
0 commit comments