Skip to content

Commit 9579b76

Browse files
committed
Handle events asynchronously in the BackgroundProcessor's async variant
1 parent 9e9615c commit 9579b76

File tree

1 file changed

+62
-12
lines changed
  • lightning-background-processor/src

1 file changed

+62
-12
lines changed

lightning-background-processor/src/lib.rs

+62-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes
2323
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2424
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2525
use lightning::routing::scoring::WriteableScore;
26+
#[cfg(feature = "futures")]
27+
use lightning::util::events::AsyncEventHandler;
2628
use lightning::util::events::{Event, EventHandler, EventsProvider};
2729
use lightning::util::logger::Logger;
2830
use lightning::util::persist::Persister;
@@ -34,6 +36,11 @@ use std::thread::JoinHandle;
3436
use std::time::{Duration, Instant};
3537
use std::ops::Deref;
3638

39+
#[cfg(feature = "futures")]
40+
use core::future::Future as StdFuture;
41+
#[cfg(feature = "futures")]
42+
use core::pin::Pin;
43+
3744
#[cfg(feature = "futures")]
3845
use futures_util::{select_biased, future::FutureExt};
3946

@@ -225,16 +232,49 @@ where A::Target: chain::Access, L::Target: Logger {
225232
}
226233
}
227234

235+
#[cfg(feature = "futures")]
236+
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
237+
struct DecoratingAsyncEventHandler<
238+
'a,
239+
E: AsyncEventHandler,
240+
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
241+
RGS: Deref<Target = RapidGossipSync<G, L>>,
242+
G: Deref<Target = NetworkGraph<L>>,
243+
A: Deref,
244+
L: Deref,
245+
>
246+
where A::Target: chain::Access, L::Target: Logger {
247+
async_event_handler: E,
248+
gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
249+
}
250+
251+
#[cfg(feature = "futures")]
252+
impl<
253+
'a,
254+
E: AsyncEventHandler,
255+
PGS: Deref<Target = P2PGossipSync<G, A, L>>,
256+
RGS: Deref<Target = RapidGossipSync<G, L>>,
257+
G: Deref<Target = NetworkGraph<L>>,
258+
A: Deref,
259+
L: Deref,
260+
> AsyncEventHandler for DecoratingAsyncEventHandler<'a, E, PGS, RGS, G, A, L>
261+
where A::Target: chain::Access, L::Target: Logger {
262+
fn handle_event_async<'b>(&'b self, event: &'b Event) -> Pin<Box<dyn StdFuture<Output = ()> + '_>> {
263+
Box::pin(async move {
264+
if let Some(network_graph) = self.gossip_sync.network_graph() {
265+
network_graph.handle_event(event); // TODO: Also make async?
266+
}
267+
self.async_event_handler.handle_event_async(event).await;
268+
})
269+
}
270+
}
271+
228272
macro_rules! define_run_body {
229-
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
273+
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
274+
$channel_manager: ident, $process_channel_manager_events: expr,
230275
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231276
$loop_exit_check: expr, $await: expr)
232277
=> { {
233-
let event_handler = DecoratingEventHandler {
234-
event_handler: $event_handler,
235-
gossip_sync: &$gossip_sync,
236-
};
237-
238278
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239279
$channel_manager.timer_tick_occurred();
240280

@@ -245,8 +285,8 @@ macro_rules! define_run_body {
245285
let mut have_pruned = false;
246286

247287
loop {
248-
$channel_manager.process_pending_events(&event_handler);
249-
$chain_monitor.process_pending_events(&event_handler);
288+
$process_channel_manager_events;
289+
$process_chain_monitor_events;
250290

251291
// Note that the PeerManager::process_events may block on ChannelManager's locks,
252292
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -379,7 +419,7 @@ pub async fn process_events_async<
379419
CMH: 'static + Deref + Send + Sync,
380420
RMH: 'static + Deref + Send + Sync,
381421
OMH: 'static + Deref + Send + Sync,
382-
EH: 'static + EventHandler + Send,
422+
EH: 'static + AsyncEventHandler + Send,
383423
PS: 'static + Deref + Send,
384424
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
385425
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
@@ -392,7 +432,7 @@ pub async fn process_events_async<
392432
SleepFuture: core::future::Future<Output = bool>,
393433
Sleeper: Fn(Duration) -> SleepFuture
394434
>(
395-
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
435+
persister: PS, async_event_handler: EH, chain_monitor: M, channel_manager: CM,
396436
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397437
sleeper: Sleeper,
398438
) -> Result<(), std::io::Error>
@@ -412,7 +452,12 @@ where
412452
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
413453
{
414454
let mut should_continue = true;
415-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
455+
let async_event_handler = DecoratingAsyncEventHandler {
456+
async_event_handler,
457+
gossip_sync: &gossip_sync,
458+
};
459+
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events_async(&async_event_handler).await,
460+
channel_manager, channel_manager.process_pending_events_async(&async_event_handler).await,
416461
gossip_sync, peer_manager, logger, scorer, should_continue, {
417462
select_biased! {
418463
_ = channel_manager.get_persistable_update_future().fuse() => true,
@@ -517,7 +562,12 @@ impl BackgroundProcessor {
517562
let stop_thread = Arc::new(AtomicBool::new(false));
518563
let stop_thread_clone = stop_thread.clone();
519564
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520-
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
565+
let event_handler = DecoratingEventHandler {
566+
event_handler,
567+
gossip_sync: &gossip_sync,
568+
};
569+
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
570+
channel_manager, channel_manager.process_pending_events(&event_handler),
521571
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
522572
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
523573
});

0 commit comments

Comments
 (0)