Skip to content

Commit

Permalink
Re-claim forwarded HTLCs on startup
Browse files Browse the repository at this point in the history
Because `ChannelMonitorUpdate`s can complete asynchronously and
out-of-order now, a `commitment_signed` `ChannelMonitorUpdate` from
a downstream channel could complete prior to the preimage
`ChannelMonitorUpdate` on the upstream channel. In that case, we may
not get a `update_fulfill_htlc` replay on startup. Thus, we have to
ensure any payment preimages contained in that downstream update are
re-claimed on startup.

Here we do this during the existing walk of the `ChannelMonitor`
preimages for closed channels.
  • Loading branch information
TheBlueMatt committed Jul 8, 2023
1 parent 63217bc commit 345f8df
Showing 1 changed file with 98 additions and 26 deletions.
124 changes: 98 additions & 26 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,6 @@ where
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,

#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool,

persistence_notifier: Notifier,
Expand Down Expand Up @@ -1872,9 +1871,7 @@ macro_rules! handle_new_monitor_update {
// update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
// any case so that it won't deadlock.
debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
#[cfg(debug_assertions)] {
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
}
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
ChannelMonitorUpdateStatus::InProgress => {
log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
Expand Down Expand Up @@ -2060,7 +2057,6 @@ where
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),

Expand Down Expand Up @@ -4097,7 +4093,6 @@ where
fn process_background_events(&self) -> NotifyOption {
debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);

#[cfg(debug_assertions)]
self.background_events_processed_since_startup.store(true, Ordering::Release);

let mut background_events = Vec::new();
Expand Down Expand Up @@ -4688,6 +4683,11 @@ where
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!

// If we haven't yet run background events assume we're still deserializing and shouldn't
// actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
// `BackgroundEvent`s.
let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);

{
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.outpoint.to_channel_id();
Expand All @@ -4714,14 +4714,26 @@ where
log_bytes!(chan_id), action);
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
if let Err(e) = res {
// TODO: This is a *critical* error - we probably updated the outbound edge
// of the HTLC's monitor with a preimage. We should retry this monitor
// update over and over again until morale improves.
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
return Err((counterparty_node_id, e));
if !during_init {
let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
if let Err(e) = res {
// TODO: This is a *critical* error - we probably updated the outbound edge
// of the HTLC's monitor with a preimage. We should retry this monitor
// update over and over again until morale improves.
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
return Err((counterparty_node_id, e));
}
} else {
// If we're running during init we cannot update a monitor directly -
// they probably haven't actually been loaded yet. Instead, push the
// monitor update as a background event.
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo: prev_hop.outpoint,
update: monitor_update.clone(),
});
}
}
return Ok(());
Expand All @@ -4734,16 +4746,34 @@ where
payment_preimage,
}],
};
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
// channel, or we must have an ability to receive the same event and try
// again on restart.
log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, update_res);

if !during_init {
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
// channel, or we must have an ability to receive the same event and try
// again on restart.
log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, update_res);
}
} else {
// If we're running during init we cannot update a monitor directly - they probably
// haven't actually been loaded yet. Instead, push the monitor update as a background
// event.
// Note that while its safe to use `ClosingMonitorUpdateRegeneratedOnStartup` here (the
// channel is already closed) we need to ultimately handle the monitor update
// completion action only after we've completed the monitor update. This is the only
// way to guarantee this update *will* be regenerated on startup (otherwise if this was
// from a forwarded HTLC the downstream preimage may be deleted before we claim
// upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
// complete the monitor update completion action from `completion_action`.
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((
prev_hop.outpoint, preimage_update,
)));
}
// Note that we do process the completion action here. This totally could be a
// duplicate claim, but we have no way of knowing without interrogating the
Expand All @@ -4761,6 +4791,8 @@ where
fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
"We don't support claim_htlc claims during startup - monitors may not be available yet");
self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(hop_data) => {
Expand Down Expand Up @@ -8485,6 +8517,11 @@ where
// Note that we have to do the above replays before we push new monitor updates.
pending_background_events.append(&mut close_background_events);

// If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
// should ensure we try them again on the inbound edge. We put them here and do so after we
// have a fully-constructed `ChannelManager` at the end.
let mut pending_claims_to_replay = Vec::new();

{
// If we're tracking pending payments, ensure we haven't lost any by looking at the
// ChannelMonitor data for any channels for which we do not have authorative state
Expand All @@ -8495,7 +8532,8 @@ where
// We only rebuild the pending payments map if we were most recently serialized by
// 0.0.102+
for (_, monitor) in args.channel_monitors.iter() {
if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
if counterparty_opt.is_none() {
for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
if path.hops.is_empty() {
Expand Down Expand Up @@ -8589,6 +8627,33 @@ where
}
}
}

// Whether the downstream channel was closed or not, try to re-apply any payment
// preimages from it which may be needed in upstream channels for forwarded
// payments.
let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
.into_iter()
.filter_map(|(htlc_source, (htlc, preimage_opt))| {
if let HTLCSource::PreviousHopData(_) = htlc_source {
if let Some(payment_preimage) = preimage_opt {
Some((htlc_source, payment_preimage, htlc.amount_msat,
// Check if `counterparty_opt.is_none()` to see if the
// downstream chan is closed (because we don't have a
// channel_id -> peer map entry).
counterparty_opt.is_none(),
monitor.get_funding_txo().0.to_channel_id()))
} else { None }
} else {
// If it was an outbound payment, we've handled it above - if a preimage
// came in and we persisted the `ChannelManager` we either handled it and
// are good to go or the channel force-closed - we don't have to handle the
// channel still live case here.
None
}
});
for tuple in outbound_claimed_htlcs_iter {
pending_claims_to_replay.push(tuple);
}
}
}

Expand Down Expand Up @@ -8821,7 +8886,6 @@ where
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),

Expand All @@ -8840,6 +8904,14 @@ where
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}

for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
// channel is closed we just assume that it probably came from an on-chain claim.
channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
downstream_closed, downstream_chan_id);
}

//TODO: Broadcast channel update for closed channels, but only after we've made a
//connection or two.

Expand Down

0 comments on commit 345f8df

Please sign in to comment.