Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watchtower: handle rogue updates #7981

Merged
merged 8 commits into from
Sep 18, 2023
Merged
6 changes: 5 additions & 1 deletion docs/release-notes/release-notes-0.17.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ fails](https://github.com/lightningnetwork/lnd/pull/7876).
retried](https://github.com/lightningnetwork/lnd/pull/7927) with an
exponential back off.


* In the watchtower client, we [now explicitly
handle](https://github.com/lightningnetwork/lnd/pull/7981) the scenario where
a channel is closed while we still have an in-memory update for it.

* `lnd` [now properly handles a case where an erroneous force close attempt
would impeded start up](https://github.com/lightningnetwork/lnd/pull/7985).


# New Features
## Functional Enhancements

Expand Down
5 changes: 5 additions & 0 deletions lnrpc/wtclientrpc/wtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ func constructFunctionalOptions(includeSessions,
return opts, ackCounts, committedUpdateCounts
}

perNumRogueUpdates := func(s *wtdb.ClientSession, numUpdates uint16) {
ackCounts[s.ID] += numUpdates
}

perNumAckedUpdates := func(s *wtdb.ClientSession, id lnwire.ChannelID,
numUpdates uint16) {

Expand All @@ -405,6 +409,7 @@ func constructFunctionalOptions(includeSessions,
opts = []wtdb.ClientSessionListOption{
wtdb.WithPerNumAckedUpdates(perNumAckedUpdates),
wtdb.WithPerCommittedUpdate(perCommittedUpdate),
wtdb.WithPerRogueUpdateCount(perNumRogueUpdates),
}

if excludeExhaustedSessions {
Expand Down
13 changes: 13 additions & 0 deletions watchtower/wtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,19 @@ func (c *TowerClient) handleClosableSessions(
// and handle it.
c.closableSessionQueue.Pop()

// Stop the session and remove it from the
// in-memory set.
err := c.activeSessions.StopAndRemove(
item.sessionID,
)
if err != nil {
c.log.Errorf("could not remove "+
"session(%s) from in-memory "+
"set: %v", item.sessionID, err)

return
}

// Fetch the session from the DB so that we can
// extract the Tower info.
sess, err := c.cfg.DB.GetClientSession(
Expand Down
162 changes: 157 additions & 5 deletions watchtower/wtclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -72,7 +73,7 @@ var (

addrScript, _ = txscript.PayToAddrScript(addr)

waitTime = 5 * time.Second
waitTime = 15 * time.Second

defaultTxPolicy = wtpolicy.TxPolicy{
BlobType: blob.TypeAltruistCommit,
Expand Down Expand Up @@ -398,7 +399,7 @@ type testHarness struct {
cfg harnessCfg
signer *wtmock.MockSigner
capacity lnwire.MilliSatoshi
clientDB *wtmock.ClientDB
clientDB *wtdb.ClientDB
clientCfg *wtclient.Config
client wtclient.Client
server *serverHarness
Expand Down Expand Up @@ -426,10 +427,26 @@ type harnessCfg struct {
noServerStart bool
}

func newClientDB(t *testing.T) *wtdb.ClientDB {
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
dbCfg := &kvdb.BoltConfig{
DBTimeout: kvdb.DefaultDBTimeout,
}

// Construct the ClientDB.
dir := t.TempDir()
bdb, err := wtdb.NewBoltBackendCreator(true, dir, "wtclient.db")(dbCfg)
require.NoError(t, err)

clientDB, err := wtdb.OpenClientDB(bdb)
require.NoError(t, err)

return clientDB
}

func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
signer := wtmock.NewMockSigner()
mockNet := newMockNet()
clientDB := wtmock.NewClientDB()
clientDB := newClientDB(t)

server := newServerHarness(
t, mockNet, towerAddrStr, func(serverCfg *wtserver.Config) {
Expand Down Expand Up @@ -509,6 +526,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
h.startClient()
t.Cleanup(func() {
require.NoError(t, h.client.Stop())
require.NoError(t, h.clientDB.Close())
})

h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance)
Expand Down Expand Up @@ -1342,7 +1360,7 @@ var clientTests = []clientTest{

// Wait for all the updates to be populated in the
// server's database.
h.server.waitForUpdates(hints, 3*time.Second)
h.server.waitForUpdates(hints, waitTime)
},
},
{
Expand Down Expand Up @@ -2053,7 +2071,7 @@ var clientTests = []clientTest{
// Now stop the client and reset its database.
require.NoError(h.t, h.client.Stop())

db := wtmock.NewClientDB()
db := newClientDB(h.t)
h.clientDB = db
h.clientCfg.DB = db

Expand Down Expand Up @@ -2398,6 +2416,140 @@ var clientTests = []clientTest{
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
// This test shows that if a channel is closed while an update
// for that channel still exists in an in-memory queue
// somewhere then it is handled correctly by treating it as a
// rogue update.
name: "channel closed while update is un-acked",
cfg: harnessCfg{
localBalance: localBalance,
remoteBalance: remoteBalance,
policy: wtpolicy.Policy{
TxPolicy: defaultTxPolicy,
MaxUpdates: 5,
},
},
fn: func(h *testHarness) {
const (
numUpdates = 10
chanIDInt = 0
)

h.sendUpdatesOn = true

// Advance the channel with a few updates.
hints := h.advanceChannelN(chanIDInt, numUpdates)

// Backup a few these updates and wait for them to
// arrive at the server. Note that we back up enough
// updates to saturate the session so that the session
// is considered closable when the channel is deleted.
h.backupStates(chanIDInt, 0, numUpdates/2, nil)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)

// Now, restart the server in a state where it will not
// ack updates. This will allow us to wait for an
// update to be un-acked and persisted.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = true
})

// Backup a few more of the update. These should remain
// in the client as un-acked.
h.backupStates(
chanIDInt, numUpdates/2, numUpdates-1, nil,
)

// Wait for the tasks to be bound to sessions.
fetchSessions := h.clientDB.FetchSessionCommittedUpdates
err := wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
nil,
)
require.NoError(h.t, err)

var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)

if len(updates) != numUpdates-1 {
return true
}
}

return false
}, waitTime)
require.NoError(h.t, err)

// Now we close this channel while the update for it has
// not yet been acked.
h.closeChannel(chanIDInt, 1)

// Closable sessions should now be one.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)

return len(cs) == 1
}, waitTime)
require.NoError(h.t, err)

// Now, restart the server and allow it to ack updates
// again.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = false
})

// Mine a few blocks so that the session close range is
// surpassed.
h.mine(3)

// Wait for there to be no more closable sessions on the
// client side.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)

return len(cs) == 0
}, waitTime)
require.NoError(h.t, err)

// Wait for channel to be "unregistered".
chanID := chanIDFromInt(chanIDInt)
err = wait.Predicate(func() bool {
err := h.client.BackupState(&chanID, 0)

return errors.Is(
err, wtclient.ErrUnregisteredChannel,
)
}, waitTime)
require.NoError(h.t, err)

// Show that the committed update for the closed channel
// is cleared from the DB.
err = wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
nil,
)
require.NoError(h.t, err)

var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)

if len(updates) != 0 {
return false
}
}

return true
}, waitTime)
require.NoError(h.t, err)
},
},
}

// TestClient executes the client test suite, asserting the ability to backup
Expand Down
Loading