From 0928ba0149321cf02dd3099f2a05d73ce2f7edfb Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 6 Nov 2023 16:39:30 +0800 Subject: [PATCH 01/15] routing: fix and enhance logging --- channeldb/payment_control.go | 4 +++- channeldb/payment_control_test.go | 5 +---- routing/payment_lifecycle.go | 8 +++++--- routing/router.go | 3 +++ 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/channeldb/payment_control.go b/channeldb/payment_control.go index 0eadf4b1f7..6e688cf512 100644 --- a/channeldb/payment_control.go +++ b/channeldb/payment_control.go @@ -424,7 +424,9 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash, // Ensure we aren't sending more than the total payment amount. sentAmt, _ := payment.SentAmt() if sentAmt+amt > payment.Info.Value { - return ErrValueExceedsAmt + return fmt.Errorf("%w: attempted=%v, payment amount="+ + "%v", ErrValueExceedsAmt, sentAmt+amt, + payment.Info.Value) } htlcsBucket, err := bucket.CreateBucketIfNotExists( diff --git a/channeldb/payment_control_test.go b/channeldb/payment_control_test.go index 5b394edae8..7751ea3677 100644 --- a/channeldb/payment_control_test.go +++ b/channeldb/payment_control_test.go @@ -734,10 +734,7 @@ func TestPaymentControlMultiShard(t *testing.T) { b := *attempt b.AttemptID = 3 _, err = pControl.RegisterAttempt(info.PaymentIdentifier, &b) - if err != ErrValueExceedsAmt { - t.Fatalf("expected ErrValueExceedsAmt, got: %v", - err) - } + require.ErrorIs(t, err, ErrValueExceedsAmt) // Fail the second attempt. a := attempts[1] diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 1d103c5ac5..5244d4d636 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -179,7 +179,7 @@ func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte, for _, a := range payment.InFlightHTLCs() { a := a - log.Infof("Resuming payment shard %v for payment %v", + log.Infof("Resuming HTLC attempt %v for payment %v", a.AttemptID, p.identifier) p.resultCollector(&a) @@ -463,6 +463,8 @@ func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) { func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) ( *attemptResult, error) { + log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt)) + // We'll retrieve the hash specific to this shard from the // shardTracker, since it will be needed to regenerate the circuit // below. @@ -663,8 +665,8 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route, func (p *paymentLifecycle) sendAttempt( attempt *channeldb.HTLCAttempt) (*attemptResult, error) { - log.Debugf("Attempting to send payment %v (pid=%v)", p.identifier, - attempt.AttemptID) + log.Debugf("Sending HTLC attempt(id=%v, amt=%v) for payment %v", + attempt.AttemptID, attempt.Route.TotalAmount, p.identifier) rt := attempt.Route diff --git a/routing/router.go b/routing/router.go index 4169548c51..4e8e0846b7 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1133,6 +1133,9 @@ func (r *ChannelRouter) SendToRouteSkipTempErr(htlcHash lntypes.Hash, func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route, skipTempErr bool) (*channeldb.HTLCAttempt, error) { + log.Debugf("SendToRoute for payment %v with skipTempErr=%v", + htlcHash, skipTempErr) + // Calculate amount paid to receiver. amt := rt.ReceiverAmt() From 941716d60e8e4109f8e921dd50d464b1e155e338 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Nov 2023 15:25:12 +0800 Subject: [PATCH 02/15] lntest+itest: add new test `testPaymentHTLCTimeout` This commit adds a new test case to validate that when an HTLC has timed out, the corresponding payment is marked as failed. --- itest/list_on_test.go | 8 + itest/lnd_payment_test.go | 317 +++++++++++++++++++++++++++++++++++++- lntest/harness.go | 10 ++ 3 files changed, 332 insertions(+), 3 deletions(-) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 7b3c579d62..336cec1b00 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -654,4 +654,12 @@ var allTestCases = []*lntest.TestCase{ Name: "coop close with external delivery", TestFunc: testCoopCloseWithExternalDelivery, }, + { + Name: "payment failed htlc local swept", + TestFunc: testPaymentFailedHTLCLocalSwept, + }, + { + Name: "payment succeeded htlc remote swept", + TestFunc: testPaymentSucceededHTLCRemoteSwept, + }, } diff --git a/itest/lnd_payment_test.go b/itest/lnd_payment_test.go index cdd1636058..4a93fabfec 100644 --- a/itest/lnd_payment_test.go +++ b/itest/lnd_payment_test.go @@ -10,7 +10,9 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" @@ -20,9 +22,318 @@ import ( "github.com/stretchr/testify/require" ) -// testSendDirectPayment creates a topology Alice->Bob and then tests that Alice -// can send a direct payment to Bob. This test modifies the fee estimator to -// return floor fee rate(1 sat/vb). +// testPaymentSucceededHTLCRemoteSwept checks that when an outgoing HTLC is +// timed out and is swept by the remote via the direct preimage spend path, the +// payment will be marked as succeeded. This test creates a topology from Alice +// -> Bob, and let Alice send payments to Bob. Bob then goes offline, such that +// Alice's outgoing HTLC will time out. Once the force close transaction is +// broadcast by Alice, she then goes offline and Bob comes back online to take +// her outgoing HTLC. And Alice should mark this payment as succeeded after she +// comes back online again. +func testPaymentSucceededHTLCRemoteSwept(ht *lntest.HarnessTest) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 10_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Alice now sends both payments to Bob. + payReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + TimeoutSeconds: 3600, + } + dustPayReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: dustInvoice.PaymentRequest, + TimeoutSeconds: 3600, + } + + // We expect the payment to stay in-flight from both streams. + ht.SendPaymentAssertInflight(alice, payReq) + ht.SendPaymentAssertInflight(alice, dustPayReq) + + // We also check the payments are marked as IN_FLIGHT in Alice's + // database. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + restartBob := ht.SuspendNode(bob) + + // Alice force closes the channel, which puts her commitment tx into + // the mempool. + ht.CloseChannelAssertPending(alice, chanPoint, true) + + // We now let Alice go offline to avoid her sweeping her outgoing htlc. + restartAlice := ht.SuspendNode(alice) + + // Mine one block to confirm Alice's force closing tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Restart Bob to settle the invoice and sweep the htlc output. + require.NoError(ht, restartBob()) + + // Bob now settles the invoices, since his link with Alice is broken, + // Alice won't know the preimages. + bob.RPC.SettleInvoice(preimage[:]) + bob.RPC.SettleInvoice(dustPreimage[:]) + + // Once Bob comes back up, he should find the force closing transaction + // from Alice and try to sweep the non-dust outgoing htlc via the + // direct preimage spend. + ht.AssertNumPendingSweeps(bob, 1) + + // Mine a block to trigger the sweep. + // + // TODO(yy): remove it once `blockbeat` is implemented. + ht.MineEmptyBlocks(1) + + // Mine Bob's sweeping tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Let Alice come back up. Since the channel is now closed, we expect + // different behaviors based on whether the HTLC is a dust. + // - For dust payment, it should be failed now as the HTLC won't go + // onchain. + // - For non-dust payment, it should be marked as succeeded since her + // outgoing htlc is swept by Bob. + require.NoError(ht, restartAlice()) + + // Since Alice is restarted, we need to track the payments again. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // We expect the non-dust payment to marked as succeeded in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_SUCCEEDED) +} + +// testPaymentFailedHTLCLocalSwept checks that when an outgoing HTLC is timed +// out and claimed onchain via the timeout path, the payment will be marked as +// failed. This test creates a topology from Alice -> Bob, and let Alice send +// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will +// time out. Alice will also be restarted to make sure resumed payments are +// also marked as failed. +func testPaymentFailedHTLCLocalSwept(ht *lntest.HarnessTest) { + success := ht.Run("fail payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, false) + }) + if !success { + return + } + + ht.Run("fail resumed payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, true) + }) +} + +// runTestPaymentHTLCTimeout is the helper function that actually runs the +// testPaymentFailedHTLCLocalSwept. +func runTestPaymentHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 20_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Alice now sends both the payments to Bob. + payReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + TimeoutSeconds: 3600, + } + dustPayReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: dustInvoice.PaymentRequest, + TimeoutSeconds: 3600, + } + + // We expect the payment to stay in-flight from both streams. + ht.SendPaymentAssertInflight(alice, payReq) + ht.SendPaymentAssertInflight(alice, dustPayReq) + + // We also check the payments are marked as IN_FLIGHT in Alice's + // database. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + ht.Shutdown(bob) + + // We'll now mine enough blocks to trigger Alice to broadcast her + // commitment transaction due to the fact that the HTLC is about to + // timeout. With the default outgoing broadcast delta of zero, this + // will be the same height as the htlc expiry height. + numBlocks := padCLTV( + uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta), + ) + ht.MineBlocks(int(numBlocks)) + + // Restart Alice if requested. + if restartAlice { + // Restart Alice to test the resumed payment is canceled. + ht.RestartNode(alice) + } + + // We now subscribe to the payment status. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Mine a block to confirm Alice's closing transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Now the channel is closed, we expect different behaviors based on + // whether the HTLC is a dust. For dust payment, it should be failed + // now as the HTLC won't go onchain. For non-dust payment, it should + // still be inflight. It won't be marked as failed unless the outgoing + // HTLC is resolved onchain. + // + // NOTE: it's possible for Bob to race against Alice using the + // preimage path. If Bob successfully claims the HTLC, Alice should + // mark the non-dust payment as succeeded. + // + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // Check that the non-dust payment is still in-flight. + // + // NOTE: we don't check the payment status from the stream here as + // there's no new status being sent. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // We now have two possible cases for the non-dust payment: + // - Bob stays offline, and Alice will sweep her outgoing HTLC, which + // makes the payment failed. + // - Bob comes back online, and claims the HTLC on Alice's commitment + // via direct preimage spend, hence racing against Alice onchain. If + // he succeeds, Alice should mark the payment as succeeded. + // + // TODO(yy): test the second case once we have the RPC to clean + // mempool. + + // Since Alice's force close transaction has been confirmed, she should + // sweep her outgoing HTLC in next block. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Cleanup the channel. + ht.CleanupForceClose(alice) + + // We expect the non-dust payment to marked as failed in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED) +} + +// testSendDirectPayment creates a topology Alice->Bob and then tests that +// Alice can send a direct payment to Bob. This test modifies the fee estimator +// to return floor fee rate(1 sat/vb). func testSendDirectPayment(ht *lntest.HarnessTest) { // Grab Alice and Bob's nodes for convenience. alice, bob := ht.Alice, ht.Bob diff --git a/lntest/harness.go b/lntest/harness.go index 9a11eaa88a..3e7ae3fae8 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -399,6 +399,8 @@ func (h *HarnessTest) resetStandbyNodes(t *testing.T) { // config for the coming test. This will also inherit the // test's running context. h.RestartNodeWithExtraArgs(hn, hn.Cfg.OriginalExtraArgs) + + hn.AddToLogf("Finished test case %v", h.manager.currentTestCase) } } @@ -1771,6 +1773,14 @@ func (h *HarnessTest) SendPaymentAssertSettled(hn *node.HarnessNode, return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_SUCCEEDED) } +// SendPaymentAssertInflight sends a payment from the passed node and asserts +// the payment is inflight. +func (h *HarnessTest) SendPaymentAssertInflight(hn *node.HarnessNode, + req *routerrpc.SendPaymentRequest) *lnrpc.Payment { + + return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_IN_FLIGHT) +} + // OpenChannelRequest is used to open a channel using the method // OpenMultiChannelsAsync. type OpenChannelRequest struct { From 21112cfdf8861242c1b67803699f8bb9e5a9a1e8 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 6 Nov 2023 06:34:08 +0800 Subject: [PATCH 03/15] htlcswitch: rename `paymentID` to `attemptID` for clarity --- htlcswitch/payment_result.go | 57 ++++++++++++++++++------------------ htlcswitch/switch.go | 16 +++++----- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index cd982b8bb7..db959e2d1a 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -90,32 +90,32 @@ type networkResultStore struct { results map[uint64][]chan *networkResult resultsMtx sync.Mutex - // paymentIDMtx is a multimutex used to make sure the database and - // result subscribers map is consistent for each payment ID in case of + // attemptIDMtx is a multimutex used to make sure the database and + // result subscribers map is consistent for each attempt ID in case of // concurrent callers. - paymentIDMtx *multimutex.Mutex[uint64] + attemptIDMtx *multimutex.Mutex[uint64] } func newNetworkResultStore(db kvdb.Backend) *networkResultStore { return &networkResultStore{ backend: db, results: make(map[uint64][]chan *networkResult), - paymentIDMtx: multimutex.NewMutex[uint64](), + attemptIDMtx: multimutex.NewMutex[uint64](), } } -// storeResult stores the networkResult for the given paymentID, and -// notifies any subscribers. -func (store *networkResultStore) storeResult(paymentID uint64, +// storeResult stores the networkResult for the given attemptID, and notifies +// any subscribers. +func (store *networkResultStore) storeResult(attemptID uint64, result *networkResult) error { - // We get a mutex for this payment ID. This is needed to ensure + // We get a mutex for this attempt ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. - store.paymentIDMtx.Lock(paymentID) - defer store.paymentIDMtx.Unlock(paymentID) + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) - log.Debugf("Storing result for paymentID=%v", paymentID) + log.Debugf("Storing result for attemptID=%v", attemptID) // Serialize the payment result. var b bytes.Buffer @@ -123,8 +123,8 @@ func (store *networkResultStore) storeResult(paymentID uint64, return err } - var paymentIDBytes [8]byte - binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID) + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( @@ -134,7 +134,7 @@ func (store *networkResultStore) storeResult(paymentID uint64, return err } - return networkResults.Put(paymentIDBytes[:], b.Bytes()) + return networkResults.Put(attemptIDBytes[:], b.Bytes()) }) if err != nil { return err @@ -143,28 +143,27 @@ func (store *networkResultStore) storeResult(paymentID uint64, // Now that the result is stored in the database, we can notify any // active subscribers. store.resultsMtx.Lock() - for _, res := range store.results[paymentID] { + for _, res := range store.results[attemptID] { res <- result } - delete(store.results, paymentID) + delete(store.results, attemptID) store.resultsMtx.Unlock() return nil } -// subscribeResult is used to get the payment result for the given -// payment ID. It returns a channel on which the result will be delivered when -// ready. -func (store *networkResultStore) subscribeResult(paymentID uint64) ( +// subscribeResult is used to get the HTLC attempt result for the given attempt +// ID. It returns a channel on which the result will be delivered when ready. +func (store *networkResultStore) subscribeResult(attemptID uint64) ( <-chan *networkResult, error) { // We get a mutex for this payment ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. - store.paymentIDMtx.Lock(paymentID) - defer store.paymentIDMtx.Unlock(paymentID) + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) - log.Debugf("Subscribing to result for paymentID=%v", paymentID) + log.Debugf("Subscribing to result for attemptID=%v", attemptID) var ( result *networkResult @@ -173,7 +172,7 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) ( err := kvdb.View(store.backend, func(tx kvdb.RTx) error { var err error - result, err = fetchResult(tx, paymentID) + result, err = fetchResult(tx, attemptID) switch { // Result not yet available, we will notify once a result is @@ -205,8 +204,8 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) ( // Otherwise we store the result channel for when the result is // available. store.resultsMtx.Lock() - store.results[paymentID] = append( - store.results[paymentID], resultChan, + store.results[attemptID] = append( + store.results[attemptID], resultChan, ) store.resultsMtx.Unlock() @@ -234,8 +233,8 @@ func (store *networkResultStore) getResult(pid uint64) ( } func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { - var paymentIDBytes [8]byte - binary.BigEndian.PutUint64(paymentIDBytes[:], pid) + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], pid) networkResults := tx.ReadBucket(networkResultStoreBucketKey) if networkResults == nil { @@ -243,7 +242,7 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { } // Check whether a result is already available. - resultBytes := networkResults.Get(paymentIDBytes[:]) + resultBytes := networkResults.Get(attemptIDBytes[:]) if resultBytes == nil { return nil, ErrPaymentIDNotFound } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 793da57dbe..d4e9518c8a 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -431,11 +431,11 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro } } -// GetAttemptResult returns the result of the payment attempt with the given +// GetAttemptResult returns the result of the HTLC attempt with the given // attemptID. The paymentHash should be set to the payment's overall hash, or // in case of AMP payments the payment's unique identifier. // -// The method returns a channel where the payment result will be sent when +// The method returns a channel where the HTLC attempt result will be sent when // available, or an error is encountered during forwarding. When a result is // received on the channel, the HTLC is guaranteed to no longer be in flight. // The switch shutting down is signaled by closing the channel. If the @@ -452,9 +452,9 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, } ) - // If the payment is not found in the circuit map, check whether a - // result is already available. - // Assumption: no one will add this payment ID other than the caller. + // If the HTLC is not found in the circuit map, check whether a result + // is already available. + // Assumption: no one will add this attempt ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { res, err := s.networkResults.getResult(attemptID) if err != nil { @@ -464,7 +464,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, c <- res nChan = c } else { - // The payment was committed to the circuits, subscribe for a + // The HTLC was committed to the circuits, subscribe for a // result. nChan, err = s.networkResults.subscribeResult(attemptID) if err != nil { @@ -474,7 +474,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, resultChan := make(chan *PaymentResult, 1) - // Since the payment was known, we can start a goroutine that can + // Since the attempt was known, we can start a goroutine that can // extract the result when it is available, and pass it on to the // caller. s.wg.Add(1) @@ -939,7 +939,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // Store the result to the db. This will also notify subscribers about // the result. if err := s.networkResults.storeResult(attemptID, n); err != nil { - log.Errorf("Unable to complete payment for pid=%v: %v", + log.Errorf("Unable to store attempt result for pid=%v: %v", attemptID, err) return } From d28d5d7a472af32c49baa0f984732d5594cfad40 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Nov 2023 15:50:29 +0800 Subject: [PATCH 04/15] itest: add `testSendToRouteFailHTLCTimeout` to check `SendToRouteV2` --- itest/list_on_test.go | 4 + itest/lnd_payment_test.go | 217 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 336cec1b00..c33536232b 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -662,4 +662,8 @@ var allTestCases = []*lntest.TestCase{ Name: "payment succeeded htlc remote swept", TestFunc: testPaymentSucceededHTLCRemoteSwept, }, + { + Name: "send to route failed htlc timeout", + TestFunc: testSendToRouteFailHTLCTimeout, + }, } diff --git a/itest/lnd_payment_test.go b/itest/lnd_payment_test.go index 4a93fabfec..317ca02eda 100644 --- a/itest/lnd_payment_test.go +++ b/itest/lnd_payment_test.go @@ -19,6 +19,7 @@ import ( "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/require" ) @@ -1211,3 +1212,219 @@ func sendPaymentInterceptAndCancel(ht *lntest.HarnessTest, // Cancel the context, which will disconnect the above interceptor. cancelInterceptor() } + +// testSendToRouteFailHTLCTimeout is similar to +// testPaymentFailedHTLCLocalSwept. The only difference is the `SendPayment` is +// replaced with `SendToRouteV2`. It checks that when an outgoing HTLC is timed +// out and claimed onchain via the timeout path, the payment will be marked as +// failed. This test creates a topology from Alice -> Bob, and let Alice send +// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will +// time out. Alice will also be restarted to make sure resumed payments are +// also marked as failed. +func testSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest) { + success := ht.Run("fail payment", func(t *testing.T) { + st := ht.Subtest(t) + runSendToRouteFailHTLCTimeout(st, false) + }) + if !success { + return + } + + ht.Run("fail resumed payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, true) + }) +} + +// runSendToRouteFailHTLCTimeout is the helper function that actually runs the +// testSendToRouteFailHTLCTimeout. +func runSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 20_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Construct a route to send the non-dust payment. + go func() { + // Query the route to send the payment. + routesReq := &lnrpc.QueryRoutesRequest{ + PubKey: bob.PubKeyStr, + Amt: paymentAmt, + FinalCltvDelta: finalCltvDelta, + } + routes := alice.RPC.QueryRoutes(routesReq) + require.Len(ht, routes.Routes, 1) + + route := routes.Routes[0] + require.Len(ht, route.Hops, 1) + + // Modify the hop to include MPP info. + route.Hops[0].MppRecord = &lnrpc.MPPRecord{ + PaymentAddr: invoice.PaymentAddr, + TotalAmtMsat: int64( + lnwire.NewMSatFromSatoshis(paymentAmt), + ), + } + + // Send the payment with the modified value. + req := &routerrpc.SendToRouteRequest{ + PaymentHash: payHash[:], + Route: route, + } + + // Send the payment and expect no error. + attempt := alice.RPC.SendToRouteV2(req) + require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status) + }() + + // Check that the payment is in-flight. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // Construct a route to send the dust payment. + go func() { + // Query the route to send the payment. + routesReq := &lnrpc.QueryRoutesRequest{ + PubKey: bob.PubKeyStr, + Amt: dustAmt, + FinalCltvDelta: finalCltvDelta, + } + routes := alice.RPC.QueryRoutes(routesReq) + require.Len(ht, routes.Routes, 1) + + route := routes.Routes[0] + require.Len(ht, route.Hops, 1) + + // Modify the hop to include MPP info. + route.Hops[0].MppRecord = &lnrpc.MPPRecord{ + PaymentAddr: dustInvoice.PaymentAddr, + TotalAmtMsat: int64( + lnwire.NewMSatFromSatoshis(dustAmt), + ), + } + + // Send the payment with the modified value. + req := &routerrpc.SendToRouteRequest{ + PaymentHash: dustPayHash[:], + Route: route, + } + + // Send the payment and expect no error. + attempt := alice.RPC.SendToRouteV2(req) + require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status) + }() + + // Check that the dust payment is in-flight. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + ht.Shutdown(bob) + + // We'll now mine enough blocks to trigger Alice to broadcast her + // commitment transaction due to the fact that the HTLC is about to + // timeout. With the default outgoing broadcast delta of zero, this + // will be the same height as the htlc expiry height. + numBlocks := padCLTV( + uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta), + ) + ht.MineEmptyBlocks(int(numBlocks - 1)) + + // Restart Alice if requested. + if restartAlice { + // Restart Alice to test the resumed payment is canceled. + ht.RestartNode(alice) + } + + // We now subscribe to the payment status. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Mine a block to confirm Alice's closing transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Now the channel is closed, we expect different behaviors based on + // whether the HTLC is a dust. For dust payment, it should be failed + // now as the HTLC won't go onchain. For non-dust payment, it should + // still be inflight. It won't be marked as failed unless the outgoing + // HTLC is resolved onchain. + // + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // Check that the non-dust payment is still in-flight. + // + // NOTE: we don't check the payment status from the stream here as + // there's no new status being sent. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // We now have two possible cases for the non-dust payment: + // - Bob stays offline, and Alice will sweep her outgoing HTLC, which + // makes the payment failed. + // - Bob comes back online, and claims the HTLC on Alice's commitment + // via direct preimage spend, hence racing against Alice onchain. If + // he succeeds, Alice should mark the payment as succeeded. + // + // TODO(yy): test the second case once we have the RPC to clean + // mempool. + + // Since Alice's force close transaction has been confirmed, she should + // sweep her outgoing HTLC in next block. + ht.MineBlocksAndAssertNumTxes(2, 1) + + // We expect the non-dust payment to marked as failed in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED) +} From 6cb374aea693b7ea726b7044cd2e70c9ded7cbd5 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Nov 2023 17:47:25 +0800 Subject: [PATCH 05/15] htlcswitch: add new method `handlePacketAdd` Simply moves the code into a new method so it's easier to follow the method `handlePacketForward`. --- htlcswitch/switch.go | 357 +++++++++++++++++++++---------------------- 1 file changed, 178 insertions(+), 179 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d4e9518c8a..425f997867 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1104,185 +1104,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { // payment circuit within our internal state so we can properly forward // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: - // Check if the node is set to reject all onward HTLCs and also make - // sure that HTLC is not from the source node. - if s.cfg.RejectHTLC { - failure := NewDetailedLinkError( - &lnwire.FailChannelDisabled{}, - OutgoingFailureForwardsDisabled, - ) - - return s.failAddPacket(packet, failure) - } - - // Before we attempt to find a non-strict forwarding path for - // this htlc, check whether the htlc is being routed over the - // same incoming and outgoing channel. If our node does not - // allow forwards of this nature, we fail the htlc early. This - // check is in place to disallow inefficiently routed htlcs from - // locking up our balance. With channels where the - // option-scid-alias feature was negotiated, we also have to be - // sure that the IDs aren't the same since one or both could be - // an alias. - linkErr := s.checkCircularForward( - packet.incomingChanID, packet.outgoingChanID, - s.cfg.AllowCircularRoute, htlc.PaymentHash, - ) - if linkErr != nil { - return s.failAddPacket(packet, linkErr) - } - - s.indexMtx.RLock() - targetLink, err := s.getLinkByMapping(packet) - if err != nil { - s.indexMtx.RUnlock() - - log.Debugf("unable to find link with "+ - "destination %v", packet.outgoingChanID) - - // If packet was forwarded from another channel link - // than we should notify this link that some error - // occurred. - linkError := NewLinkError( - &lnwire.FailUnknownNextPeer{}, - ) - - return s.failAddPacket(packet, linkError) - } - targetPeerKey := targetLink.PeerPubKey() - interfaceLinks, _ := s.getLinks(targetPeerKey) - s.indexMtx.RUnlock() - - // We'll keep track of any HTLC failures during the link - // selection process. This way we can return the error for - // precise link that the sender selected, while optimistically - // trying all links to utilize our available bandwidth. - linkErrs := make(map[lnwire.ShortChannelID]*LinkError) - - // Find all destination channel links with appropriate - // bandwidth. - var destinations []ChannelLink - for _, link := range interfaceLinks { - var failure *LinkError - - // We'll skip any links that aren't yet eligible for - // forwarding. - if !link.EligibleToForward() { - failure = NewDetailedLinkError( - &lnwire.FailUnknownNextPeer{}, - OutgoingFailureLinkNotEligible, - ) - } else { - // We'll ensure that the HTLC satisfies the - // current forwarding conditions of this target - // link. - currentHeight := atomic.LoadUint32(&s.bestHeight) - failure = link.CheckHtlcForward( - htlc.PaymentHash, packet.incomingAmount, - packet.amount, packet.incomingTimeout, - packet.outgoingTimeout, - packet.inboundFee, - currentHeight, - packet.originalOutgoingChanID, - ) - } - - // If this link can forward the htlc, add it to the set - // of destinations. - if failure == nil { - destinations = append(destinations, link) - continue - } - - linkErrs[link.ShortChanID()] = failure - } - - // If we had a forwarding failure due to the HTLC not - // satisfying the current policy, then we'll send back an - // error, but ensure we send back the error sourced at the - // *target* link. - if len(destinations) == 0 { - // At this point, some or all of the links rejected the - // HTLC so we couldn't forward it. So we'll try to look - // up the error that came from the source. - linkErr, ok := linkErrs[packet.outgoingChanID] - if !ok { - // If we can't find the error of the source, - // then we'll return an unknown next peer, - // though this should never happen. - linkErr = NewLinkError( - &lnwire.FailUnknownNextPeer{}, - ) - log.Warnf("unable to find err source for "+ - "outgoing_link=%v, errors=%v", - packet.outgoingChanID, - lnutils.SpewLogClosure(linkErrs)) - } - - log.Tracef("incoming HTLC(%x) violated "+ - "target outgoing link (id=%v) policy: %v", - htlc.PaymentHash[:], packet.outgoingChanID, - linkErr) - - return s.failAddPacket(packet, linkErr) - } - - // Choose a random link out of the set of links that can forward - // this htlc. The reason for randomization is to evenly - // distribute the htlc load without making assumptions about - // what the best channel is. - destination := destinations[rand.Intn(len(destinations))] // nolint:gosec - - // Retrieve the incoming link by its ShortChannelID. Note that - // the incomingChanID is never set to hop.Source here. - s.indexMtx.RLock() - incomingLink, err := s.getLinkByShortID(packet.incomingChanID) - s.indexMtx.RUnlock() - if err != nil { - // If we couldn't find the incoming link, we can't - // evaluate the incoming's exposure to dust, so we just - // fail the HTLC back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Evaluate whether this HTLC would increase our fee exposure - // over the threshold on the incoming link. If it does, fail it - // backwards. - if s.dustExceedsFeeThreshold( - incomingLink, packet.incomingAmount, true, - ) { - // The incoming dust exceeds the threshold, so we fail - // the add back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Also evaluate whether this HTLC would increase our fee - // exposure over the threshold on the destination link. If it - // does, fail it back. - if s.dustExceedsFeeThreshold( - destination, packet.amount, false, - ) { - // The outgoing dust exceeds the threshold, so we fail - // the add back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Send the packet to the destination channel link which - // manages the channel. - packet.outgoingChanID = destination.ShortChanID() - return destination.handleSwitchPacket(packet) + return s.handlePacketAdd(packet, htlc) case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: // If the source of this packet has not been set, use the @@ -3052,3 +2874,180 @@ func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID, return nil } + +// handlePacketAdd handles forwarding an Add packet. +func (s *Switch) handlePacketAdd(packet *htlcPacket, + htlc *lnwire.UpdateAddHTLC) error { + + // Check if the node is set to reject all onward HTLCs and also make + // sure that HTLC is not from the source node. + if s.cfg.RejectHTLC { + failure := NewDetailedLinkError( + &lnwire.FailChannelDisabled{}, + OutgoingFailureForwardsDisabled, + ) + + return s.failAddPacket(packet, failure) + } + + // Before we attempt to find a non-strict forwarding path for this + // htlc, check whether the htlc is being routed over the same incoming + // and outgoing channel. If our node does not allow forwards of this + // nature, we fail the htlc early. This check is in place to disallow + // inefficiently routed htlcs from locking up our balance. With + // channels where the option-scid-alias feature was negotiated, we also + // have to be sure that the IDs aren't the same since one or both could + // be an alias. + linkErr := s.checkCircularForward( + packet.incomingChanID, packet.outgoingChanID, + s.cfg.AllowCircularRoute, htlc.PaymentHash, + ) + if linkErr != nil { + return s.failAddPacket(packet, linkErr) + } + + s.indexMtx.RLock() + targetLink, err := s.getLinkByMapping(packet) + if err != nil { + s.indexMtx.RUnlock() + + log.Debugf("unable to find link with "+ + "destination %v", packet.outgoingChanID) + + // If packet was forwarded from another channel link than we + // should notify this link that some error occurred. + linkError := NewLinkError( + &lnwire.FailUnknownNextPeer{}, + ) + + return s.failAddPacket(packet, linkError) + } + targetPeerKey := targetLink.PeerPubKey() + interfaceLinks, _ := s.getLinks(targetPeerKey) + s.indexMtx.RUnlock() + + // We'll keep track of any HTLC failures during the link selection + // process. This way we can return the error for precise link that the + // sender selected, while optimistically trying all links to utilize + // our available bandwidth. + linkErrs := make(map[lnwire.ShortChannelID]*LinkError) + + // Find all destination channel links with appropriate bandwidth. + var destinations []ChannelLink + for _, link := range interfaceLinks { + var failure *LinkError + + // We'll skip any links that aren't yet eligible for + // forwarding. + if !link.EligibleToForward() { + failure = NewDetailedLinkError( + &lnwire.FailUnknownNextPeer{}, + OutgoingFailureLinkNotEligible, + ) + } else { + // We'll ensure that the HTLC satisfies the current + // forwarding conditions of this target link. + currentHeight := atomic.LoadUint32(&s.bestHeight) + failure = link.CheckHtlcForward( + htlc.PaymentHash, packet.incomingAmount, + packet.amount, packet.incomingTimeout, + packet.outgoingTimeout, + packet.inboundFee, + currentHeight, + packet.originalOutgoingChanID, + ) + } + + // If this link can forward the htlc, add it to the set of + // destinations. + if failure == nil { + destinations = append(destinations, link) + continue + } + + linkErrs[link.ShortChanID()] = failure + } + + // If we had a forwarding failure due to the HTLC not satisfying the + // current policy, then we'll send back an error, but ensure we send + // back the error sourced at the *target* link. + if len(destinations) == 0 { + // At this point, some or all of the links rejected the HTLC so + // we couldn't forward it. So we'll try to look up the error + // that came from the source. + linkErr, ok := linkErrs[packet.outgoingChanID] + if !ok { + // If we can't find the error of the source, then we'll + // return an unknown next peer, though this should + // never happen. + linkErr = NewLinkError( + &lnwire.FailUnknownNextPeer{}, + ) + log.Warnf("unable to find err source for "+ + "outgoing_link=%v, errors=%v", + packet.outgoingChanID, + lnutils.SpewLogClosure(linkErrs)) + } + + log.Tracef("incoming HTLC(%x) violated "+ + "target outgoing link (id=%v) policy: %v", + htlc.PaymentHash[:], packet.outgoingChanID, + linkErr) + + return s.failAddPacket(packet, linkErr) + } + + // Choose a random link out of the set of links that can forward this + // htlc. The reason for randomization is to evenly distribute the htlc + // load without making assumptions about what the best channel is. + destination := destinations[rand.Intn(len(destinations))] // nolint:gosec + + // Retrieve the incoming link by its ShortChannelID. Note that the + // incomingChanID is never set to hop.Source here. + s.indexMtx.RLock() + incomingLink, err := s.getLinkByShortID(packet.incomingChanID) + s.indexMtx.RUnlock() + if err != nil { + // If we couldn't find the incoming link, we can't evaluate the + // incoming's exposure to dust, so we just fail the HTLC back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Evaluate whether this HTLC would increase our fee exposure over the + // threshold on the incoming link. If it does, fail it backwards. + if s.dustExceedsFeeThreshold( + incomingLink, packet.incomingAmount, true, + ) { + // The incoming dust exceeds the threshold, so we fail the add + // back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Also evaluate whether this HTLC would increase our fee exposure over + // the threshold on the destination link. If it does, fail it back. + if s.dustExceedsFeeThreshold( + destination, packet.amount, false, + ) { + // The outgoing dust exceeds the threshold, so we fail the add + // back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Send the packet to the destination channel link which manages the + // channel. + packet.outgoingChanID = destination.ShortChanID() + + return destination.handleSwitchPacket(packet) +} From 2fc79d894630529cb5369737444efca7997c7397 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Nov 2023 18:42:44 +0800 Subject: [PATCH 06/15] htlcswitch: handle forwarding settle and fail seperately This commit adds two methods, `handlePacketFail` and `handlePacketSettle` to handle the settle and fail packets differently. --- htlcswitch/switch.go | 272 +++++++++++++++++++++++++++---------------- 1 file changed, 169 insertions(+), 103 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 425f997867..9ed4202eec 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1099,118 +1099,26 @@ func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter, // updates back. This behaviour is achieved by creation of payment circuits. func (s *Switch) handlePacketForward(packet *htlcPacket) error { switch htlc := packet.htlc.(type) { - // Channel link forwarded us a new htlc, therefore we initiate the // payment circuit within our internal state so we can properly forward // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: return s.handlePacketAdd(packet, htlc) - case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: - // If the source of this packet has not been set, use the - // circuit map to lookup the origin. - circuit, err := s.closeCircuit(packet) - if err != nil { - return err - } - - // closeCircuit returns a nil circuit when a settle packet returns an - // ErrUnknownCircuit error upon the inner call to CloseCircuit. - if circuit == nil { - return nil - } - - fail, isFail := htlc.(*lnwire.UpdateFailHTLC) - if isFail && !packet.hasSource { - // HTLC resolutions and messages restored from disk - // don't have the obfuscator set from the original htlc - // add packet - set it here for use in blinded errors. - packet.obfuscator = circuit.ErrorEncrypter - - switch { - // No message to encrypt, locally sourced payment. - case circuit.ErrorEncrypter == nil: - - // If this is a resolution message, then we'll need to - // encrypt it as it's actually internally sourced. - case packet.isResolution: - var err error - // TODO(roasbeef): don't need to pass actually? - failure := &lnwire.FailPermanentChannelFailure{} - fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( - failure, - ) - if err != nil { - err = fmt.Errorf("unable to obfuscate "+ - "error: %v", err) - log.Error(err) - } - - // Alternatively, if the remote party send us an - // UpdateFailMalformedHTLC, then we'll need to convert - // this into a proper well formatted onion error as - // there's no HMAC currently. - case packet.convertedError: - log.Infof("Converting malformed HTLC error "+ - "for circuit for Circuit(%x: "+ - "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash, - packet.incomingChanID, packet.incomingHTLCID, - packet.outgoingChanID, packet.outgoingHTLCID) - - fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError( - fail.Reason, - ) - - default: - // Otherwise, it's a forwarded error, so we'll perform a - // wrapper encryption as normal. - fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( - fail.Reason, - ) - } - } else if !isFail && circuit.Outgoing != nil { - // If this is an HTLC settle, and it wasn't from a - // locally initiated HTLC, then we'll log a forwarding - // event so we can flush it to disk later. - // - // TODO(roasbeef): only do this once link actually - // fully settles? - localHTLC := packet.incomingChanID == hop.Source - if !localHTLC { - log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ - "from IncomingChanID(%v) to OutgoingChanID(%v)", - circuit.PaymentHash[:], circuit.OutgoingAmount, - circuit.IncomingAmount-circuit.OutgoingAmount, - circuit.Incoming.ChanID, circuit.Outgoing.ChanID) - s.fwdEventMtx.Lock() - s.pendingFwdingEvents = append( - s.pendingFwdingEvents, - channeldb.ForwardingEvent{ - Timestamp: time.Now(), - IncomingChanID: circuit.Incoming.ChanID, - OutgoingChanID: circuit.Outgoing.ChanID, - AmtIn: circuit.IncomingAmount, - AmtOut: circuit.OutgoingAmount, - }, - ) - s.fwdEventMtx.Unlock() - } - } - - // A blank IncomingChanID in a circuit indicates that it is a pending - // user-initiated payment. - if packet.incomingChanID == hop.Source { - s.wg.Add(1) - go s.handleLocalResponse(packet) - return nil - } + case *lnwire.UpdateFulfillHTLC: + return s.handlePacketSettle(packet) - // Check to see that the source link is online before removing - // the circuit. - return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) + // Channel link forwarded us an update_fail_htlc message. + // + // NOTE: when the channel link receives an update_fail_malformed_htlc + // from upstream, it will convert the message into update_fail_htlc and + // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC` + // here. + case *lnwire.UpdateFailHTLC: + return s.handlePacketFail(packet, htlc) default: - return errors.New("wrong update type") + return fmt.Errorf("wrong update type: %T", htlc) } } @@ -3051,3 +2959,161 @@ func (s *Switch) handlePacketAdd(packet *htlcPacket, return destination.handleSwitchPacket(packet) } + +// handlePacketSettle handles forwarding a settle packet. +func (s *Switch) handlePacketSettle(packet *htlcPacket) error { + // If the source of this packet has not been set, use the circuit map + // to lookup the origin. + circuit, err := s.closeCircuit(packet) + if err != nil { + return err + } + + // closeCircuit returns a nil circuit when a settle packet returns an + // ErrUnknownCircuit error upon the inner call to CloseCircuit. + // + // NOTE: We can only get a nil circuit when it has already been deleted + // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck` + // is received, which invokes `processRemoteSettleFails` in its link. + if circuit == nil { + log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet)) + return nil + } + + localHTLC := packet.incomingChanID == hop.Source + + // If this is a locally initiated HTLC, we need to handle the packet by + // storing the network result. + // + // A blank IncomingChanID in a circuit indicates that it is a pending + // user-initiated payment. + // + // NOTE: `closeCircuit` modifies the state of `packet`. + if localHTLC { + // TODO(yy): remove the goroutine and send back the error here. + s.wg.Add(1) + go s.handleLocalResponse(packet) + + // If this is a locally initiated HTLC, there's no need to + // forward it so we exit. + return nil + } + + // If this is an HTLC settle, and it wasn't from a locally initiated + // HTLC, then we'll log a forwarding event so we can flush it to disk + // later. + if circuit.Outgoing != nil { + log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ + "from IncomingChanID(%v) to OutgoingChanID(%v)", + circuit.PaymentHash[:], circuit.OutgoingAmount, + circuit.IncomingAmount-circuit.OutgoingAmount, + circuit.Incoming.ChanID, circuit.Outgoing.ChanID) + + s.fwdEventMtx.Lock() + s.pendingFwdingEvents = append( + s.pendingFwdingEvents, + channeldb.ForwardingEvent{ + Timestamp: time.Now(), + IncomingChanID: circuit.Incoming.ChanID, + OutgoingChanID: circuit.Outgoing.ChanID, + AmtIn: circuit.IncomingAmount, + AmtOut: circuit.OutgoingAmount, + }, + ) + s.fwdEventMtx.Unlock() + } + + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) +} + +// handlePacketFail handles forwarding a fail packet. +func (s *Switch) handlePacketFail(packet *htlcPacket, + htlc *lnwire.UpdateFailHTLC) error { + + // If the source of this packet has not been set, use the circuit map + // to lookup the origin. + circuit, err := s.closeCircuit(packet) + if err != nil { + return err + } + + // If this is a locally initiated HTLC, we need to handle the packet by + // storing the network result. + // + // A blank IncomingChanID in a circuit indicates that it is a pending + // user-initiated payment. + // + // NOTE: `closeCircuit` modifies the state of `packet`. + if packet.incomingChanID == hop.Source { + // TODO(yy): remove the goroutine and send back the error here. + s.wg.Add(1) + go s.handleLocalResponse(packet) + + // If this is a locally initiated HTLC, there's no need to + // forward it so we exit. + return nil + } + + // Exit early if this hasSource is true. This flag is only set via + // mailbox's `FailAdd`. This method has two callsites, + // - the packet has timed out after `MailboxDeliveryTimeout`, defaults + // to 1 min. + // - the HTLC fails the validation in `channel.AddHTLC`. + // In either case, the `Reason` field is populated. Thus there's no + // need to proceed and extract the failure reason below. + if packet.hasSource { + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) + } + + // HTLC resolutions and messages restored from disk don't have the + // obfuscator set from the original htlc add packet - set it here for + // use in blinded errors. + packet.obfuscator = circuit.ErrorEncrypter + + switch { + // No message to encrypt, locally sourced payment. + case circuit.ErrorEncrypter == nil: + // TODO(yy) further check this case as we shouldn't end up here + // as `isLocal` is already false. + + // If this is a resolution message, then we'll need to encrypt it as + // it's actually internally sourced. + case packet.isResolution: + var err error + // TODO(roasbeef): don't need to pass actually? + failure := &lnwire.FailPermanentChannelFailure{} + htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( + failure, + ) + if err != nil { + err = fmt.Errorf("unable to obfuscate error: %w", err) + log.Error(err) + } + + // Alternatively, if the remote party sends us an + // UpdateFailMalformedHTLC, then we'll need to convert this into a + // proper well formatted onion error as there's no HMAC currently. + case packet.convertedError: + log.Infof("Converting malformed HTLC error for circuit for "+ + "Circuit(%x: (%s, %d) <-> (%s, %d))", + packet.circuit.PaymentHash, + packet.incomingChanID, packet.incomingHTLCID, + packet.outgoingChanID, packet.outgoingHTLCID) + + htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError( + htlc.Reason, + ) + + default: + // Otherwise, it's a forwarded error, so we'll perform a + // wrapper encryption as normal. + htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( + htlc.Reason, + ) + } + + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) +} From 3b6e28d19b992d71b1ab8fd40b46e37660e9c6a1 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 15 Nov 2023 03:58:54 +0800 Subject: [PATCH 07/15] channeldb+htlcswitch: make sure circuit is not nil in `teardownCircuit` --- channeldb/forwarding_package.go | 5 +++ htlcswitch/link.go | 2 +- htlcswitch/switch.go | 57 +++++++++++++-------------------- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 09f5671cc9..4c447fc035 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -211,6 +211,11 @@ func (f *PkgFilter) Decode(r io.Reader) error { return err } +// String returns a human-readable string. +func (f *PkgFilter) String() string { + return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter) +} + // FwdPkg records all adds, settles, and fails that were locked in as a result // of the remote peer sending us a revocation. Each package is identified by // the short chanid and remote commitment height corresponding to the revocation diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 7b7688d9c0..337ce636cd 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3297,7 +3297,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, return } - l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter) + l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter) var switchPackets []*htlcPacket for i, pd := range settleFails { diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 9ed4202eec..1ac8f93a56 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -967,7 +967,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // This can only happen if the circuit is still open, which is why this // ordering is chosen. if err := s.teardownCircuit(pkt); err != nil { - log.Warnf("Unable to teardown circuit %s: %v", + log.Errorf("Unable to teardown circuit %s: %v", pkt.inKey(), err) return } @@ -1359,48 +1359,35 @@ func (s *Switch) teardownCircuit(pkt *htlcPacket) error { case *lnwire.UpdateFailHTLC: pktType = "FAIL" default: - err := fmt.Errorf("cannot tear down packet of type: %T", htlc) - log.Errorf(err.Error()) - return err + return fmt.Errorf("cannot tear down packet of type: %T", htlc) } - switch { - case pkt.circuit.HasKeystone(): - log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+ - "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) + var paymentHash lntypes.Hash - err := s.circuits.DeleteCircuits(pkt.inKey()) - if err != nil { - log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+ - "with payment_hash-%v using %s pkt", - pkt.incomingChanID, pkt.incomingHTLCID, - pkt.outgoingChanID, pkt.outgoingHTLCID, - pkt.circuit.PaymentHash, pktType) - return err - } - - log.Debugf("Closed completed %s circuit for %x: "+ - "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID, - pkt.outgoingChanID, pkt.outgoingHTLCID) + // Perform a defensive check to make sure we don't try to access a nil + // circuit. + circuit := pkt.circuit + if circuit != nil { + copy(paymentHash[:], circuit.PaymentHash[:]) + } - default: - log.Debugf("Tearing down incomplete circuit with %s for inkey=%v", - pktType, pkt.inKey()) + log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+ + "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) - err := s.circuits.DeleteCircuits(pkt.inKey()) - if err != nil { - log.Warnf("Failed to tear down pending %s circuit for %x: "+ - "(%s, %d)", pktType, pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID) - return err - } + err := s.circuits.DeleteCircuits(pkt.inKey()) + if err != nil { + log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+ + "with payment_hash=%v using %s pkt", pkt.incomingChanID, + pkt.incomingHTLCID, pkt.outgoingChanID, + pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType) - log.Debugf("Removed pending onion circuit for %x: "+ - "(%s, %d)", pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID) + return err } + log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType, + paymentHash, pkt.incomingChanID, pkt.incomingHTLCID, + pkt.outgoingChanID, pkt.outgoingHTLCID) + return nil } From bbf58ab444a46a598b359d0bdf006423f933e446 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 13 Nov 2023 17:00:47 +0800 Subject: [PATCH 08/15] routing: add new method `resumePayments` to handle payments during startup --- routing/router.go | 176 +++++++++++++++++++++++++--------------------- 1 file changed, 94 insertions(+), 82 deletions(-) diff --git a/routing/router.go b/routing/router.go index 4e8e0846b7..e5c55e6093 100644 --- a/routing/router.go +++ b/routing/router.go @@ -336,88 +336,8 @@ func (r *ChannelRouter) Start() error { // If any payments are still in flight, we resume, to make sure their // results are properly handled. - payments, err := r.cfg.Control.FetchInFlightPayments() - if err != nil { - return err - } - - // Before we restart existing payments and start accepting more - // payments to be made, we clean the network result store of the - // Switch. We do this here at startup to ensure no more payments can be - // made concurrently, so we know the toKeep map will be up-to-date - // until the cleaning has finished. - toKeep := make(map[uint64]struct{}) - for _, p := range payments { - for _, a := range p.HTLCs { - toKeep[a.AttemptID] = struct{}{} - } - } - - log.Debugf("Cleaning network result store.") - if err := r.cfg.Payer.CleanStore(toKeep); err != nil { - return err - } - - for _, payment := range payments { - log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier) - r.wg.Add(1) - go func(payment *channeldb.MPPayment) { - defer r.wg.Done() - - // Get the hashes used for the outstanding HTLCs. - htlcs := make(map[uint64]lntypes.Hash) - for _, a := range payment.HTLCs { - a := a - - // We check whether the individual attempts - // have their HTLC hash set, if not we'll fall - // back to the overall payment hash. - hash := payment.Info.PaymentIdentifier - if a.Hash != nil { - hash = *a.Hash - } - - htlcs[a.AttemptID] = hash - } - - // Since we are not supporting creating more shards - // after a restart (only receiving the result of the - // shards already outstanding), we create a simple - // shard tracker that will map the attempt IDs to - // hashes used for the HTLCs. This will be enough also - // for AMP payments, since we only need the hashes for - // the individual HTLCs to regenerate the circuits, and - // we don't currently persist the root share necessary - // to re-derive them. - shardTracker := shards.NewSimpleShardTracker( - payment.Info.PaymentIdentifier, htlcs, - ) - - // We create a dummy, empty payment session such that - // we won't make another payment attempt when the - // result for the in-flight attempt is received. - paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() - - // We pass in a non-timeout context, to indicate we - // don't need it to timeout. It will stop immediately - // after the existing attempt has finished anyway. We - // also set a zero fee limit, as no more routes should - // be tried. - noTimeout := time.Duration(0) - _, _, err := r.sendPayment( - context.Background(), 0, - payment.Info.PaymentIdentifier, noTimeout, - paySession, shardTracker, - ) - if err != nil { - log.Errorf("Resuming payment %v failed: %v.", - payment.Info.PaymentIdentifier, err) - return - } - - log.Infof("Resumed payment %v completed.", - payment.Info.PaymentIdentifier) - }(payment) + if err := r.resumePayments(); err != nil { + log.Error("Failed to resume payments during startup") } return nil @@ -1451,6 +1371,98 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], ) } +// resumePayments fetches inflight payments and resumes their payment +// lifecycles. +func (r *ChannelRouter) resumePayments() error { + // Get all payments that are inflight. + payments, err := r.cfg.Control.FetchInFlightPayments() + if err != nil { + return err + } + + // Before we restart existing payments and start accepting more + // payments to be made, we clean the network result store of the + // Switch. We do this here at startup to ensure no more payments can be + // made concurrently, so we know the toKeep map will be up-to-date + // until the cleaning has finished. + toKeep := make(map[uint64]struct{}) + for _, p := range payments { + for _, a := range p.HTLCs { + toKeep[a.AttemptID] = struct{}{} + } + } + + log.Debugf("Cleaning network result store.") + if err := r.cfg.Payer.CleanStore(toKeep); err != nil { + return err + } + + // launchPayment is a helper closure that handles resuming the payment. + launchPayment := func(payment *channeldb.MPPayment) { + defer r.wg.Done() + + // Get the hashes used for the outstanding HTLCs. + htlcs := make(map[uint64]lntypes.Hash) + for _, a := range payment.HTLCs { + a := a + + // We check whether the individual attempts have their + // HTLC hash set, if not we'll fall back to the overall + // payment hash. + hash := payment.Info.PaymentIdentifier + if a.Hash != nil { + hash = *a.Hash + } + + htlcs[a.AttemptID] = hash + } + + payHash := payment.Info.PaymentIdentifier + + // Since we are not supporting creating more shards after a + // restart (only receiving the result of the shards already + // outstanding), we create a simple shard tracker that will map + // the attempt IDs to hashes used for the HTLCs. This will be + // enough also for AMP payments, since we only need the hashes + // for the individual HTLCs to regenerate the circuits, and we + // don't currently persist the root share necessary to + // re-derive them. + shardTracker := shards.NewSimpleShardTracker(payHash, htlcs) + + // We create a dummy, empty payment session such that we won't + // make another payment attempt when the result for the + // in-flight attempt is received. + paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() + + // We pass in a non-timeout context, to indicate we don't need + // it to timeout. It will stop immediately after the existing + // attempt has finished anyway. We also set a zero fee limit, + // as no more routes should be tried. + noTimeout := time.Duration(0) + _, _, err := r.sendPayment( + context.Background(), 0, payHash, noTimeout, paySession, + shardTracker, + ) + if err != nil { + log.Errorf("Resuming payment %v failed: %v", payHash, + err) + + return + } + + log.Infof("Resumed payment %v completed", payHash) + } + + for _, payment := range payments { + log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier) + + r.wg.Add(1) + go launchPayment(payment) + } + + return nil +} + // getEdgeUnifiers returns a list of edge unifiers for the given route. func getEdgeUnifiers(source route.Vertex, hops []route.Vertex, outgoingChans map[uint64]struct{}, From b998ce11f18ab9f43575fc83d1193e9312105306 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Nov 2023 20:53:19 +0800 Subject: [PATCH 09/15] routing+htlcswitch: add new interface method `HasAttemptResult` --- htlcswitch/switch.go | 15 +++++++++++++++ routing/mock_test.go | 17 +++++++++++++++++ routing/router.go | 10 ++++++++++ 3 files changed, 42 insertions(+) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 1ac8f93a56..0cd3d7c3db 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -431,6 +431,21 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro } } +// HasAttemptResult reads the network result store to fetch the specified +// attempt. Returns true if the attempt result exists. +func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { + _, err := s.networkResults.getResult(attemptID) + if err == nil { + return true, nil + } + + if !errors.Is(err, ErrPaymentIDNotFound) { + return false, err + } + + return false, nil +} + // GetAttemptResult returns the result of the HTLC attempt with the given // attemptID. The paymentHash should be set to the payment's overall hash, or // in case of AMP payments the payment's unique identifier. diff --git a/routing/mock_test.go b/routing/mock_test.go index f712c420de..306c182107 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -60,6 +60,12 @@ func (m *mockPaymentAttemptDispatcherOld) SendHTLC( return nil } +func (m *mockPaymentAttemptDispatcherOld) HasAttemptResult( + attemptID uint64) (bool, error) { + + return false, nil +} + func (m *mockPaymentAttemptDispatcherOld) GetAttemptResult(paymentID uint64, _ lntypes.Hash, _ htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) { @@ -209,6 +215,10 @@ func (m *mockPayerOld) SendHTLC(_ lnwire.ShortChannelID, } +func (m *mockPayerOld) HasAttemptResult(attemptID uint64) (bool, error) { + return false, nil +} + func (m *mockPayerOld) GetAttemptResult(paymentID uint64, _ lntypes.Hash, _ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) { @@ -585,6 +595,13 @@ func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID, return args.Error(0) } +func (m *mockPaymentAttemptDispatcher) HasAttemptResult( + attemptID uint64) (bool, error) { + + args := m.Called(attemptID) + return args.Bool(0), args.Error(1) +} + func (m *mockPaymentAttemptDispatcher) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, deobfuscator htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) { diff --git a/routing/router.go b/routing/router.go index e5c55e6093..c121982772 100644 --- a/routing/router.go +++ b/routing/router.go @@ -135,6 +135,16 @@ type PaymentAttemptDispatcher interface { // NOTE: New payment attempts MUST NOT be made after the keepPids map // has been created and this method has returned. CleanStore(keepPids map[uint64]struct{}) error + + // HasAttemptResult reads the network result store to fetch the + // specified attempt. Returns true if the attempt result exists. + // + // NOTE: This method is used and should only be used by the router to + // resume payments during startup. It can be viewed as a subset of + // `GetAttemptResult` in terms of its functionality, and can be removed + // once we move the construction of `UpdateAddHTLC` and + // `ErrorDecrypter` into `htlcswitch`. + HasAttemptResult(attemptID uint64) (bool, error) } // PaymentSessionSource is an interface that defines a source for the router to From c2f7e6a66f15baadf785f5bc7aa37c181d1dc20a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Nov 2023 20:53:43 +0800 Subject: [PATCH 10/15] routing: fail stale HTLC attempts during startup This commit adds a check during router's startup and fails the inflight HTLCs if they are routing using channels unknown to us. The channels are unknown because they are already closed, usually long time ago. --- routing/router.go | 128 +++++++++++++++++++++++++++++++++++++++++ routing/router_test.go | 16 +++++- server.go | 25 ++++---- 3 files changed, 154 insertions(+), 15 deletions(-) diff --git a/routing/router.go b/routing/router.go index c121982772..5862ed12e2 100644 --- a/routing/router.go +++ b/routing/router.go @@ -285,6 +285,13 @@ type Config struct { // ApplyChannelUpdate can be called to apply a new channel update to the // graph that we received from a payment failure. ApplyChannelUpdate func(msg *lnwire.ChannelUpdate) bool + + // FetchClosedChannels is used by the router to fetch closed channels. + // + // TODO(yy): remove this method once the root cause of stuck payments + // is found. + FetchClosedChannels func(pendingOnly bool) ( + []*channeldb.ChannelCloseSummary, error) } // EdgeLocator is a struct used to identify a specific edge. @@ -1384,6 +1391,19 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], // resumePayments fetches inflight payments and resumes their payment // lifecycles. func (r *ChannelRouter) resumePayments() error { + // Get a list of closed channels. + channels, err := r.cfg.FetchClosedChannels(false) + if err != nil { + return err + } + + closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels)) + for _, c := range channels { + if !c.IsPending { + closedSCIDs[c.ShortChanID] = struct{}{} + } + } + // Get all payments that are inflight. payments, err := r.cfg.Control.FetchInFlightPayments() if err != nil { @@ -1399,6 +1419,12 @@ func (r *ChannelRouter) resumePayments() error { for _, p := range payments { for _, a := range p.HTLCs { toKeep[a.AttemptID] = struct{}{} + + // Try to fail the attempt if the route contains a dead + // channel. + r.failStaleAttempt( + a, p.Info.PaymentIdentifier, closedSCIDs, + ) } } @@ -1473,6 +1499,108 @@ func (r *ChannelRouter) resumePayments() error { return nil } +// failStaleAttempt will fail an HTLC attempt if it's using an unknown channel +// in its route. It first consults the switch to see if there's already a +// network result stored for this attempt. If not, it will check whether the +// first hop of this attempt is using an active channel known to us. If +// inactive, this attempt will be failed. +// +// NOTE: there's an unknown bug that caused the network result for a particular +// attempt to NOT be saved, resulting a payment being stuck forever. More info: +// - https://github.com/lightningnetwork/lnd/issues/8146 +// - https://github.com/lightningnetwork/lnd/pull/8174 +func (r *ChannelRouter) failStaleAttempt(a channeldb.HTLCAttempt, + payHash lntypes.Hash, closedSCIDs map[lnwire.ShortChannelID]struct{}) { + + // We can only fail inflight HTLCs so we skip the settled/failed ones. + if a.Failure != nil || a.Settle != nil { + return + } + + // First, check if we've already had a network result for this attempt. + // If no result is found, we'll check whether the reference link is + // still known to us. + ok, err := r.cfg.Payer.HasAttemptResult(a.AttemptID) + if err != nil { + log.Errorf("Failed to fetch network result for attempt=%v", + a.AttemptID) + return + } + + // There's already a network result, no need to fail it here as the + // payment lifecycle will take care of it, so we can exit early. + if ok { + log.Debugf("Already have network result for attempt=%v", + a.AttemptID) + return + } + + // We now need to decide whether this attempt should be failed here. + // For very old payments, it's possible that the network results were + // never saved, causing the payments to be stuck inflight. We now check + // whether the first hop is referencing an active channel ID and, if + // not, we will fail the attempt as it has no way to be retried again. + var shouldFail bool + + // Validate that the attempt has hop info. If this attempt has no hop + // info it indicates an error in our db. + if len(a.Route.Hops) == 0 { + log.Errorf("Found empty hop for attempt=%v", a.AttemptID) + + shouldFail = true + } else { + // Get the short channel ID. + chanID := a.Route.Hops[0].ChannelID + scid := lnwire.NewShortChanIDFromInt(chanID) + + // Check whether this link is active. If so, we won't fail the + // attempt but keep waiting for its result. + _, err := r.cfg.GetLink(scid) + if err == nil { + return + } + + // We should get the link not found err. If not, we will log an + // error and skip failing this attempt since an unknown error + // occurred. + if !errors.Is(err, htlcswitch.ErrChannelLinkNotFound) { + log.Errorf("Failed to get link for attempt=%v for "+ + "payment=%v: %v", a.AttemptID, payHash, err) + + return + } + + // The channel link is not active, we now check whether this + // channel is already closed. If so, we fail it as there's no + // need to wait for the network result because it won't be + // re-sent. If the channel is still pending, we'll keep waiting + // for the result as we may get a contract resolution for this + // HTLC. + if _, ok := closedSCIDs[scid]; ok { + shouldFail = true + } + } + + // Exit if there's no need to fail. + if !shouldFail { + return + } + + log.Errorf("Failing stale attempt=%v for payment=%v", a.AttemptID, + payHash) + + // Fail the attempt in db. If there's an error, there's nothing we can + // do here but logging it. + failInfo := &channeldb.HTLCFailInfo{ + Reason: channeldb.HTLCFailUnknown, + FailTime: r.cfg.Clock.Now(), + } + _, err = r.cfg.Control.FailAttempt(payHash, a.AttemptID, failInfo) + if err != nil { + log.Errorf("Fail attempt=%v got error: %v", a.AttemptID, err) + } +} + // getEdgeUnifiers returns a list of edge unifiers for the given route. func getEdgeUnifiers(source route.Vertex, hops []route.Vertex, outgoingChans map[uint64]struct{}, diff --git a/routing/router_test.go b/routing/router_test.go index 3a9b97d290..72c7b206b0 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -93,6 +93,10 @@ func (c *testCtx) getChannelIDFromAlias(t *testing.T, a, b string) uint64 { return channelID } +func mockFetchClosedChannels(_ bool) ([]*channeldb.ChannelCloseSummary, error) { + return nil, nil +} + func createTestCtxFromGraphInstance(t *testing.T, startingHeight uint32, graphInstance *testGraphInstance) *testCtx { @@ -158,9 +162,10 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, next := atomic.AddUint64(&uniquePaymentID, 1) return next, nil }, - PathFindingConfig: pathFindingConfig, - Clock: clock.NewTestClock(time.Unix(1, 0)), - ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate, + PathFindingConfig: pathFindingConfig, + Clock: clock.NewTestClock(time.Unix(1, 0)), + ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate, + FetchClosedChannels: mockFetchClosedChannels, }) require.NoError(t, router.Start(), "unable to start router") @@ -2170,6 +2175,7 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + FetchClosedChannels: mockFetchClosedChannels, }} // Register mockers with the expected method calls. @@ -2253,6 +2259,7 @@ func TestSendToRouteSkipTempErrNonMPP(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + FetchClosedChannels: mockFetchClosedChannels, }} // Expect an error to be returned. @@ -2307,6 +2314,7 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + FetchClosedChannels: mockFetchClosedChannels, }} // Create the error to be returned. @@ -2389,6 +2397,7 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + FetchClosedChannels: mockFetchClosedChannels, }} // Create the error to be returned. @@ -2475,6 +2484,7 @@ func TestSendToRouteTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + FetchClosedChannels: mockFetchClosedChannels, }} // Create the error to be returned. diff --git a/server.go b/server.go index a99591997f..ead67d1027 100644 --- a/server.go +++ b/server.go @@ -993,18 +993,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.chanRouter, err = routing.New(routing.Config{ - SelfNode: selfNode.PubKeyBytes, - RoutingGraph: graphsession.NewRoutingGraph(chanGraph), - Chain: cc.ChainIO, - Payer: s.htlcSwitch, - Control: s.controlTower, - MissionControl: s.missionControl, - SessionSource: paymentSessionSource, - GetLink: s.htlcSwitch.GetLinkByShortID, - NextPaymentID: sequencer.NextID, - PathFindingConfig: pathFindingConfig, - Clock: clock.NewDefaultClock(), - ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, + SelfNode: selfNode.PubKeyBytes, + RoutingGraph: graphsession.NewRoutingGraph(chanGraph), + Chain: cc.ChainIO, + Payer: s.htlcSwitch, + Control: s.controlTower, + MissionControl: s.missionControl, + SessionSource: paymentSessionSource, + GetLink: s.htlcSwitch.GetLinkByShortID, + NextPaymentID: sequencer.NextID, + PathFindingConfig: pathFindingConfig, + Clock: clock.NewDefaultClock(), + ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, + FetchClosedChannels: s.chanStateDB.FetchClosedChannels, }) if err != nil { return nil, fmt.Errorf("can't create router: %w", err) From e8f292edf46a5710369e11472a50afccff6886f9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Nov 2023 20:58:32 +0800 Subject: [PATCH 11/15] docs: update release notes re stuck payment fix --- docs/release-notes/release-notes-0.18.3.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/release-notes/release-notes-0.18.3.md b/docs/release-notes/release-notes-0.18.3.md index 02fd7697d0..047d85ba93 100644 --- a/docs/release-notes/release-notes-0.18.3.md +++ b/docs/release-notes/release-notes-0.18.3.md @@ -48,6 +48,15 @@ bumping an anchor channel closing was not possible when no HTLCs were on the commitment when the channel was force closed. +* [Fixed](https://github.com/lightningnetwork/lnd/pull/8174) old payments that + are stuck inflight. Though the root cause is unknown, it's possible the + network result for a given HTLC attempt was not saved, which is now fixed. + Check + [here](https://github.com/lightningnetwork/lnd/pull/8174#issue-1992055103) + for the details about the analysis, and + [here](https://github.com/lightningnetwork/lnd/issues/8146) for a summary of + the issue. + # New Features ## Functional Enhancements ## RPC Additions From 188aa9a4d47fde1f037d63aa2b94f380f45fd2a2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 17 Nov 2023 21:07:53 +0800 Subject: [PATCH 12/15] routing+lnd: prepare closed channel SCIDs in server The method `FetchClosedChannels` sometimes prematurely mark a pending force closing channel as finalized, therefore we need to furthur check `FetchPendingChannels` to make sure the channel is indeed finalized. --- routing/router.go | 39 +++++++--------------- routing/router_test.go | 22 ++++++------- server.go | 75 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 83 insertions(+), 53 deletions(-) diff --git a/routing/router.go b/routing/router.go index 5862ed12e2..94ceafaca1 100644 --- a/routing/router.go +++ b/routing/router.go @@ -286,12 +286,10 @@ type Config struct { // graph that we received from a payment failure. ApplyChannelUpdate func(msg *lnwire.ChannelUpdate) bool - // FetchClosedChannels is used by the router to fetch closed channels. + // ClosedSCIDs is used by the router to fetch closed channels. // - // TODO(yy): remove this method once the root cause of stuck payments - // is found. - FetchClosedChannels func(pendingOnly bool) ( - []*channeldb.ChannelCloseSummary, error) + // TODO(yy): remove it once the root cause of stuck payments is found. + ClosedSCIDs map[lnwire.ShortChannelID]struct{} } // EdgeLocator is a struct used to identify a specific edge. @@ -1391,19 +1389,6 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], // resumePayments fetches inflight payments and resumes their payment // lifecycles. func (r *ChannelRouter) resumePayments() error { - // Get a list of closed channels. - channels, err := r.cfg.FetchClosedChannels(false) - if err != nil { - return err - } - - closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels)) - for _, c := range channels { - if !c.IsPending { - closedSCIDs[c.ShortChanID] = struct{}{} - } - } - // Get all payments that are inflight. payments, err := r.cfg.Control.FetchInFlightPayments() if err != nil { @@ -1422,9 +1407,7 @@ func (r *ChannelRouter) resumePayments() error { // Try to fail the attempt if the route contains a dead // channel. - r.failStaleAttempt( - a, p.Info.PaymentIdentifier, closedSCIDs, - ) + r.failStaleAttempt(a, p.Info.PaymentIdentifier) } } @@ -1510,7 +1493,7 @@ func (r *ChannelRouter) resumePayments() error { // - https://github.com/lightningnetwork/lnd/issues/8146 // - https://github.com/lightningnetwork/lnd/pull/8174 func (r *ChannelRouter) failStaleAttempt(a channeldb.HTLCAttempt, - payHash lntypes.Hash, closedSCIDs map[lnwire.ShortChannelID]struct{}) { + payHash lntypes.Hash) { // We can only fail inflight HTLCs so we skip the settled/failed ones. if a.Failure != nil || a.Settle != nil { @@ -1571,12 +1554,12 @@ func (r *ChannelRouter) failStaleAttempt(a channeldb.HTLCAttempt, } // The channel link is not active, we now check whether this - // channel is already closed. If so, we fail it as there's no - // need to wait for the network result because it won't be - // re-sent. If the channel is still pending, we'll keep waiting - // for the result as we may get a contract resolution for this - // HTLC. - if _, ok := closedSCIDs[scid]; ok { + // channel is already closed. If so, we fail the HTLC attempt + // as there's no need to wait for its network result because + // there's no link. If the channel is still pending, we'll keep + // waiting for the result as we may get a contract resolution + // for this HTLC. + if _, ok := r.cfg.ClosedSCIDs[scid]; ok { shouldFail = true } } diff --git a/routing/router_test.go b/routing/router_test.go index 72c7b206b0..073761fa05 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -93,9 +93,7 @@ func (c *testCtx) getChannelIDFromAlias(t *testing.T, a, b string) uint64 { return channelID } -func mockFetchClosedChannels(_ bool) ([]*channeldb.ChannelCloseSummary, error) { - return nil, nil -} +var mockClosedSCIDs map[lnwire.ShortChannelID]struct{} func createTestCtxFromGraphInstance(t *testing.T, startingHeight uint32, graphInstance *testGraphInstance) *testCtx { @@ -162,10 +160,10 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, next := atomic.AddUint64(&uniquePaymentID, 1) return next, nil }, - PathFindingConfig: pathFindingConfig, - Clock: clock.NewTestClock(time.Unix(1, 0)), - ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate, - FetchClosedChannels: mockFetchClosedChannels, + PathFindingConfig: pathFindingConfig, + Clock: clock.NewTestClock(time.Unix(1, 0)), + ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate, + ClosedSCIDs: mockClosedSCIDs, }) require.NoError(t, router.Start(), "unable to start router") @@ -2175,7 +2173,7 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, - FetchClosedChannels: mockFetchClosedChannels, + ClosedSCIDs: mockClosedSCIDs, }} // Register mockers with the expected method calls. @@ -2259,7 +2257,7 @@ func TestSendToRouteSkipTempErrNonMPP(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, - FetchClosedChannels: mockFetchClosedChannels, + ClosedSCIDs: mockClosedSCIDs, }} // Expect an error to be returned. @@ -2314,7 +2312,7 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, - FetchClosedChannels: mockFetchClosedChannels, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. @@ -2397,7 +2395,7 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, - FetchClosedChannels: mockFetchClosedChannels, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. @@ -2484,7 +2482,7 @@ func TestSendToRouteTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, - FetchClosedChannels: mockFetchClosedChannels, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. diff --git a/server.go b/server.go index ead67d1027..555ee26274 100644 --- a/server.go +++ b/server.go @@ -993,19 +993,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.chanRouter, err = routing.New(routing.Config{ - SelfNode: selfNode.PubKeyBytes, - RoutingGraph: graphsession.NewRoutingGraph(chanGraph), - Chain: cc.ChainIO, - Payer: s.htlcSwitch, - Control: s.controlTower, - MissionControl: s.missionControl, - SessionSource: paymentSessionSource, - GetLink: s.htlcSwitch.GetLinkByShortID, - NextPaymentID: sequencer.NextID, - PathFindingConfig: pathFindingConfig, - Clock: clock.NewDefaultClock(), - ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, - FetchClosedChannels: s.chanStateDB.FetchClosedChannels, + SelfNode: selfNode.PubKeyBytes, + RoutingGraph: graphsession.NewRoutingGraph(chanGraph), + Chain: cc.ChainIO, + Payer: s.htlcSwitch, + Control: s.controlTower, + MissionControl: s.missionControl, + SessionSource: paymentSessionSource, + GetLink: s.htlcSwitch.GetLinkByShortID, + NextPaymentID: sequencer.NextID, + PathFindingConfig: pathFindingConfig, + Clock: clock.NewDefaultClock(), + ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, + ClosedSCIDs: s.fetchClosedChannelSCIDs(), }) if err != nil { return nil, fmt.Errorf("can't create router: %w", err) @@ -4830,3 +4830,52 @@ func shouldPeerBootstrap(cfg *Config) bool { // covering the bootstrapping process. return !cfg.NoNetBootstrap && !isDevNetwork } + +// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing +// finished. +func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} { + // Get a list of closed channels. + channels, err := s.chanStateDB.FetchClosedChannels(false) + if err != nil { + srvrLog.Errorf("Failed to fetch closed channels: %v", err) + return nil + } + + // Save the SCIDs in a map. + closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels)) + for _, c := range channels { + // If the channel is not pending, its FC has been finalized. + if !c.IsPending { + closedSCIDs[c.ShortChanID] = struct{}{} + } + } + + // Double check whether the reported closed channel has indeed finished + // closing. + // + // NOTE: There are misalignments regarding when a channel's FC is + // marked as finalized. We double check the pending channels to make + // sure the returned SCIDs are indeed terminated. + // + // TODO(yy): fix the misalignments in `FetchClosedChannels`. + pendings, err := s.chanStateDB.FetchPendingChannels() + if err != nil { + srvrLog.Errorf("Failed to fetch pending channels: %v", err) + return nil + } + + for _, c := range pendings { + if _, ok := closedSCIDs[c.ShortChannelID]; !ok { + continue + } + + // If the channel is still reported as pending, remove it from + // the map. + delete(closedSCIDs, c.ShortChannelID) + + srvrLog.Warnf("Channel=%v is prematurely marked as finalized", + c.ShortChannelID) + } + + return closedSCIDs +} From 7aba5cbc0aba769bdda183a2d52fb707199bf530 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 22 Jul 2024 21:46:42 +0800 Subject: [PATCH 13/15] routing: fix linter complains --- htlcswitch/switch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 0cd3d7c3db..efd469f785 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -2910,7 +2910,8 @@ func (s *Switch) handlePacketAdd(packet *htlcPacket, // Choose a random link out of the set of links that can forward this // htlc. The reason for randomization is to evenly distribute the htlc // load without making assumptions about what the best channel is. - destination := destinations[rand.Intn(len(destinations))] // nolint:gosec + //nolint:gosec + destination := destinations[rand.Intn(len(destinations))] // Retrieve the incoming link by its ShortChannelID. Note that the // incomingChanID is never set to hop.Source here. From 677f2c390a0846afd4aeb083f8067fe50d909b72 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 24 Jul 2024 03:10:31 +0800 Subject: [PATCH 14/15] lntest: re-define `CleanupForceClose` To reflect the new sweeping behavior, also makes it easier to be used as we need to method to quickly cleanup force closes without concerning the details when we are not testing the force close behavior. --- lntest/harness.go | 25 ++++--------------------- lntest/harness_miner.go | 23 ++++++++++++----------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/lntest/harness.go b/lntest/harness.go index 3e7ae3fae8..e8996e9a64 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -1626,31 +1626,14 @@ func (h *HarnessTest) OpenChannelPsbt(srcNode, destNode *node.HarnessNode, return respStream, upd.PsbtFund.Psbt } -// CleanupForceClose mines a force close commitment found in the mempool and -// the following sweep transaction from the force closing node. +// CleanupForceClose mines blocks to clean up the force close process. This is +// used for tests that are not asserting the expected behavior is found during +// the force close process, e.g., num of sweeps, etc. Instead, it provides a +// shortcut to move the test forward with a clean mempool. func (h *HarnessTest) CleanupForceClose(hn *node.HarnessNode) { // Wait for the channel to be marked pending force close. h.AssertNumPendingForceClose(hn, 1) - // Mine enough blocks for the node to sweep its funds from the force - // closed channel. The commit sweep resolver is able to offer the input - // to the sweeper at defaulCSV-1, and broadcast the sweep tx once one - // more block is mined. - // - // NOTE: we might empty blocks here as we don't know the exact number - // of blocks to mine. This may end up mining more blocks than needed. - h.MineEmptyBlocks(node.DefaultCSV - 1) - - // Assert there is one pending sweep. - h.AssertNumPendingSweeps(hn, 1) - - // Mine a block to trigger the sweep. - h.MineEmptyBlocks(1) - - // The node should now sweep the funds, clean up by mining the sweeping - // tx. - h.MineBlocksAndAssertNumTxes(1, 1) - // Mine blocks to get any second level HTLC resolved. If there are no // HTLCs, this will behave like h.AssertNumPendingCloseChannels. h.mineTillForceCloseResolved(hn) diff --git a/lntest/harness_miner.go b/lntest/harness_miner.go index 65994d254f..bc9aef1805 100644 --- a/lntest/harness_miner.go +++ b/lntest/harness_miner.go @@ -167,16 +167,9 @@ func (h *HarnessTest) cleanMempool() { } // mineTillForceCloseResolved asserts that the number of pending close channels -// are zero. Each time it checks, a new block is mined using MineBlocksSlow to -// give the node some time to catch up the chain. -// -// NOTE: this method is a workaround to make sure we have a clean mempool at -// the end of a channel force closure. We cannot directly mine blocks and -// assert channels being fully closed because the subsystems in lnd don't share -// the same block height. This is especially the case when blocks are produced -// too fast. -// TODO(yy): remove this workaround when syncing blocks are unified in all the -// subsystems. +// are zero. Each time it checks, an empty block is mined, followed by a +// mempool check to see if there are any sweeping txns. If found, these txns +// are then mined to clean up the mempool. func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) { _, startHeight := h.GetBestBlock() @@ -184,7 +177,15 @@ func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) { resp := hn.RPC.PendingChannels() total := len(resp.PendingForceClosingChannels) if total != 0 { - h.MineBlocks(1) + // Mine an empty block first. + h.MineEmptyBlocks(1) + + // If there are new sweeping txns, mine a block to + // confirm it. + mem := h.GetRawMempool() + if len(mem) != 0 { + h.MineBlocksAndAssertNumTxes(1, len(mem)) + } return fmt.Errorf("expected num of pending force " + "close channel to be zero") From bc31a8b36f3bbf9846afd86a1a270672c3e0ca8c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 6 Aug 2024 20:28:08 +0800 Subject: [PATCH 15/15] routing: fix doc string --- routing/router.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/routing/router.go b/routing/router.go index 94ceafaca1..ec134a15b8 100644 --- a/routing/router.go +++ b/routing/router.go @@ -262,9 +262,9 @@ type Config struct { // sessions. SessionSource PaymentSessionSource - // QueryBandwidth is a method that allows the router to query the lower - // link layer to determine the up-to-date available bandwidth at a - // prospective link to be traversed. If the link isn't available, then + // GetLink is a method that allows the router to query the lower link + // layer to determine the up-to-date available bandwidth at a + // prospective link to be traversed. If the link isn't available, then // a value of zero should be returned. Otherwise, the current up-to- // date knowledge of the available bandwidth of the link should be // returned.