Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-claim forwarded HTLCs on startup #2364

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ pub(super) struct ReestablishResponses {
}

/// The return type of `force_shutdown`
///
/// Contains a (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
/// followed by a list of HTLCs to fail back in the form of the (source, payment hash, and this
/// channel's counterparty_node_id and channel_id).
pub(crate) type ShutdownResult = (
Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>
Expand Down
144 changes: 108 additions & 36 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,19 +507,19 @@ struct ClaimablePayments {
/// running normally, and specifically must be processed before any other non-background
/// [`ChannelMonitorUpdate`]s are applied.
enum BackgroundEvent {
/// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from
/// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public
/// key to handle channel resumption, whereas if the channel has been force-closed we do not
/// need the counterparty node_id.
/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
/// maybe-non-closing variant needs a public key to handle channel resumption, whereas if the
/// channel has been force-closed we do not need the counterparty node_id.
///
/// Note that any such events are lost on shutdown, so in general they must be updates which
/// are regenerated on startup.
ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
ClosedMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)),
/// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the
/// channel to continue normal operation.
///
/// In general this should be used rather than
/// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the
/// [`Self::ClosedMonitorUpdateRegeneratedOnStartup`], however in cases where the
/// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`]
/// error the other variant is acceptable.
///
Expand Down Expand Up @@ -1098,7 +1098,6 @@ where
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,

#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool,

persistence_notifier: Notifier,
Expand Down Expand Up @@ -1872,9 +1871,7 @@ macro_rules! handle_new_monitor_update {
// update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in
// any case so that it won't deadlock.
debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread);
#[cfg(debug_assertions)] {
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
}
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
ChannelMonitorUpdateStatus::InProgress => {
log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
Expand Down Expand Up @@ -2060,7 +2057,6 @@ where
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),

Expand Down Expand Up @@ -4097,7 +4093,6 @@ where
fn process_background_events(&self) -> NotifyOption {
debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread);

#[cfg(debug_assertions)]
self.background_events_processed_since_startup.store(true, Ordering::Release);

let mut background_events = Vec::new();
Expand All @@ -4108,7 +4103,7 @@ where

for event in background_events.drain(..) {
match event {
BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => {
// The channel has already been closed, so no use bothering to care about the
// monitor updating completing.
let _ = self.chain_monitor.update_channel(funding_txo, &update);
Expand Down Expand Up @@ -4688,6 +4683,11 @@ where
-> Result<(), (PublicKey, MsgHandleErrInternal)> {
//TODO: Delay the claimed_funds relaying just like we do outbound relay!

// If we haven't yet run background events assume we're still deserializing and shouldn't
// actually pass `ChannelMonitorUpdate`s to users yet. Instead, queue them up as
// `BackgroundEvent`s.
let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);

{
let per_peer_state = self.per_peer_state.read().unwrap();
let chan_id = prev_hop.outpoint.to_channel_id();
Expand All @@ -4714,14 +4714,26 @@ where
log_bytes!(chan_id), action);
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
if let Err(e) = res {
// TODO: This is a *critical* error - we probably updated the outbound edge
// of the HTLC's monitor with a preimage. We should retry this monitor
// update over and over again until morale improves.
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
return Err((counterparty_node_id, e));
if !during_init {
let res = handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
if let Err(e) = res {
// TODO: This is a *critical* error - we probably updated the outbound edge
// of the HTLC's monitor with a preimage. We should retry this monitor
// update over and over again until morale improves.
log_error!(self.logger, "Failed to update channel monitor with preimage {:?}", payment_preimage);
return Err((counterparty_node_id, e));
}
} else {
// If we're running during init we cannot update a monitor directly -
// they probably haven't actually been loaded yet. Instead, push the
// monitor update as a background event.
self.pending_background_events.lock().unwrap().push(
valentinewallace marked this conversation as resolved.
Show resolved Hide resolved
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo: prev_hop.outpoint,
update: monitor_update.clone(),
});
}
}
return Ok(());
Expand All @@ -4734,16 +4746,34 @@ where
payment_preimage,
}],
};
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
// channel, or we must have an ability to receive the same event and try
// again on restart.
log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, update_res);

if !during_init {
// We update the ChannelMonitor on the backward link, after
// receiving an `update_fulfill_htlc` from the forward link.
let update_res = self.chain_monitor.update_channel(prev_hop.outpoint, &preimage_update);
if update_res != ChannelMonitorUpdateStatus::Completed {
// TODO: This needs to be handled somehow - if we receive a monitor update
// with a preimage we *must* somehow manage to propagate it to the upstream
// channel, or we must have an ability to receive the same event and try
// again on restart.
log_error!(self.logger, "Critical error: failed to update channel monitor with preimage {:?}: {:?}",
payment_preimage, update_res);
}
} else {
// If we're running during init we cannot update a monitor directly - they probably
// haven't actually been loaded yet. Instead, push the monitor update as a background
// event.
// Note that while it's safe to use `ClosedMonitorUpdateRegeneratedOnStartup` here (the
// channel is already closed) we need to ultimately handle the monitor update
// completion action only after we've completed the monitor update. This is the only
// way to guarantee this update *will* be regenerated on startup (otherwise if this was
// from a forwarded HTLC the downstream preimage may be deleted before we claim
// upstream). Thus, we need to transition to some new `BackgroundEvent` type which will
// complete the monitor update completion action from `completion_action`.
self.pending_background_events.lock().unwrap().push(
BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
prev_hop.outpoint, preimage_update,
TheBlueMatt marked this conversation as resolved.
Show resolved Hide resolved
)));
}
// Note that we do process the completion action here. This totally could be a
// duplicate claim, but we have no way of knowing without interrogating the
Expand All @@ -4761,6 +4791,8 @@ where
fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
"We don't support claim_htlc claims during startup - monitors may not be available yet");
self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(hop_data) => {
Expand Down Expand Up @@ -5695,7 +5727,7 @@ where
}

/// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
/// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
/// [`msgs::RevokeAndACK`] should be held for the given channel until some other action
/// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
/// the [`ChannelMonitorUpdate`] in question.
fn raa_monitor_updates_held(&self,
Expand Down Expand Up @@ -6312,7 +6344,7 @@ where
/// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
/// [`Event`] being handled) completes, this should be called to restore the channel to normal
/// operation. It will double-check that nothing *else* is also blocking the same channel from
/// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
/// making progress and then let any blocked [`ChannelMonitorUpdate`]s fly.
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
let mut errors = Vec::new();
loop {
Expand Down Expand Up @@ -8230,7 +8262,7 @@ where
update_id: CLOSED_CHANNEL_UPDATE_ID,
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
};
close_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update)));
}
}

Expand Down Expand Up @@ -8485,6 +8517,11 @@ where
// Note that we have to do the above replays before we push new monitor updates.
pending_background_events.append(&mut close_background_events);

// If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
// should ensure we try them again on the inbound edge. We put them here and do so after we
// have a fully-constructed `ChannelManager` at the end.
let mut pending_claims_to_replay = Vec::new();

{
// If we're tracking pending payments, ensure we haven't lost any by looking at the
// ChannelMonitor data for any channels for which we do not have authorative state
Expand All @@ -8495,7 +8532,8 @@ where
// We only rebuild the pending payments map if we were most recently serialized by
// 0.0.102+
for (_, monitor) in args.channel_monitors.iter() {
if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
let counterparty_opt = id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id());
if counterparty_opt.is_none() {
for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() {
if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source {
if path.hops.is_empty() {
Expand Down Expand Up @@ -8589,6 +8627,33 @@ where
}
}
}

// Whether the downstream channel was closed or not, try to re-apply any payment
// preimages from it which may be needed in upstream channels for forwarded
// payments.
let outbound_claimed_htlcs_iter = monitor.get_all_current_outbound_htlcs()
.into_iter()
.filter_map(|(htlc_source, (htlc, preimage_opt))| {
if let HTLCSource::PreviousHopData(_) = htlc_source {
if let Some(payment_preimage) = preimage_opt {
Some((htlc_source, payment_preimage, htlc.amount_msat,
// Check if `counterparty_opt.is_none()` to see if the
// downstream chan is closed (because we don't have a
// channel_id -> peer map entry).
counterparty_opt.is_none(),
monitor.get_funding_txo().0.to_channel_id()))
} else { None }
} else {
// If it was an outbound payment, we've handled it above - if a preimage
// came in and we persisted the `ChannelManager` we either handled it and
// are good to go or the channel force-closed - we don't have to handle the
// channel still live case here.
None
}
});
for tuple in outbound_claimed_htlcs_iter {
pending_claims_to_replay.push(tuple);
}
}
}

Expand Down Expand Up @@ -8821,7 +8886,6 @@ where
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
#[cfg(debug_assertions)]
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),

Expand All @@ -8840,6 +8904,14 @@ where
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}

for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
// channel is closed we just assume that it probably came from an on-chain claim.
channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
wpaulino marked this conversation as resolved.
Show resolved Hide resolved
downstream_closed, downstream_chan_id);
}

//TODO: Broadcast channel update for closed channels, but only after we've made a
//connection or two.

Expand Down