From d4fefda10a355ff705bb812f0bcdbd6df7bc2572 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 15 Jan 2024 16:50:12 +0200 Subject: [PATCH 1/3] wtclient: demonstrate overflow queue flake In this commit, some temporary variables and logic is added to the DiskOverflowQueue for easy stop/go control from unit tests. This is then used to write a temporary unit tests that demonstrates a race condition that can cause the queue to be in disk mode when it should be in memory mode. This new code & test will be removed after the issue has been fixed. --- watchtower/wtclient/queue.go | 81 +++++++++++++++++++++++--- watchtower/wtclient/queue_test.go | 95 +++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/watchtower/wtclient/queue.go b/watchtower/wtclient/queue.go index 9e680c12a4..7f61336b7b 100644 --- a/watchtower/wtclient/queue.go +++ b/watchtower/wtclient/queue.go @@ -92,6 +92,21 @@ type DiskOverflowQueue[T any] struct { // task. leftOverItem3 *T + // waitForDiskWriteSignal is true if blockDiskWrite should be used to + // wait for an explicit signal before writing an item to disk. This is + // for the purpose of testing only and will be removed after the fix of + // a test flake has been demonstrated. + waitForDiskWriteSignal bool + blockDiskWrite chan struct{} + + // waitForFeedOutputSignal is true if startFeedOutputChan should be used + // to wait for an explicit signal before allowing the feedOutput + // goroutine to begin its duties. This is for the purpose of testing + // only and will be removed after the fix of a test flake has been + // demonstrated. + waitForFeedOutputSignal bool + startFeedOutputChan chan struct{} + quit chan struct{} wg sync.WaitGroup } @@ -106,14 +121,16 @@ func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64, } q := &DiskOverflowQueue[T]{ - log: logger, - db: db, - inputList: list.New(), - newDiskItemSignal: make(chan struct{}, 1), - inputChan: make(chan *internalTask[T]), - memQueue: make(chan T, maxQueueSize-2), - outputChan: make(chan T), - quit: make(chan struct{}), + log: logger, + db: db, + inputList: list.New(), + newDiskItemSignal: make(chan struct{}, 1), + inputChan: make(chan *internalTask[T]), + memQueue: make(chan T, maxQueueSize-2), + outputChan: make(chan T), + blockDiskWrite: make(chan struct{}, 1), + startFeedOutputChan: make(chan struct{}, 1), + quit: make(chan struct{}), } q.inputListCond = sync.NewCond(&q.inputListMu) @@ -130,6 +147,27 @@ func (q *DiskOverflowQueue[T]) Start() error { return err } +// allowDiskWrite is used to explicitly signal that a disk write may take place. +// This is for the purposes of testing only and will be removed once a specific +// test flake fix has been demonstrated. +func (q *DiskOverflowQueue[T]) allowDiskWrite() { + select { + case q.blockDiskWrite <- struct{}{}: + default: + } +} + +// startFeedOutput is used to explicitly signal that the feedOutput goroutine +// may start. +// This is for the purposes of testing only and will be removed once a specific +// test flake fix has been demonstrated. +func (q *DiskOverflowQueue[T]) startFeedOutput() { + select { + case q.startFeedOutputChan <- struct{}{}: + default: + } +} + // start kicks off all the goroutines that are required to manage the queue. func (q *DiskOverflowQueue[T]) start() error { numDisk, err := q.db.Len() @@ -345,6 +383,21 @@ func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool { // If the queue is in disk mode then any new items should be put // straight into the disk queue. if q.toDisk.Load() { + + // If waitForDiskWriteSignal is true, then we wait here for + // an explicit signal before writing the item to disk. This is + // for testing only and will be removed once a specific test + // flake fix has been demonstrated. + if q.waitForDiskWriteSignal { + select { + case <-q.blockDiskWrite: + case <-q.quit: + q.leftOverItem3 = &task + + return false + } + } + err := q.db.Push(task) if err != nil { // Log and back off for a few seconds and then @@ -545,6 +598,18 @@ func (q *DiskOverflowQueue[T]) feedOutputChan() { q.wg.Done() }() + // If waitForFeedOutputSignal is true, then we wait here for an explicit + // signal before starting the main loop of the function. This is for + // testing only and will be removed once a specific test flake fix has + // been demonstrated. + if q.waitForFeedOutputSignal { + select { + case <-q.startFeedOutputChan: + case <-q.quit: + return + } + } + for { select { case nextTask, ok := <-q.memQueue: diff --git a/watchtower/wtclient/queue_test.go b/watchtower/wtclient/queue_test.go index 529acb49a4..7adb5a9bfd 100644 --- a/watchtower/wtclient/queue_test.go +++ b/watchtower/wtclient/queue_test.go @@ -38,6 +38,10 @@ func TestDiskOverflowQueue(t *testing.T) { name: "start stop queue", run: testStartStopQueue, }, + { + name: "demo flake", + run: demoFlake, + }, } initDB := func() wtdb.Queue[*wtdb.BackupID] { @@ -71,6 +75,97 @@ func TestDiskOverflowQueue(t *testing.T) { } } +// demoFlake is a temporary test demonstrating race condition that currently +// exists in the DiskOverflowQueue. It contrives a scenario that results in +// the queue being in memory mode when it really should be in disk mode. +func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { + // Generate some backup IDs that we want to add to the queue. + tasks := genBackupIDs(10) + + // New mock logger. + log := newMockLogger(t.Logf) + + // Init the queue with the mock DB. + q, err := NewDiskOverflowQueue[*wtdb.BackupID]( + db, maxInMemItems, log, + ) + require.NoError(t, err) + + // Set the two test variables to true so that we have more control over + // the feedOutput goroutine and disk writes from the test. + q.waitForDiskWriteSignal = true + q.waitForFeedOutputSignal = true + + // Start the queue. + require.NoError(t, q.Start()) + + // Initially there should be no items on disk. + assertNumDisk(t, db, 0) + + // Start filling up the queue. The maxInMemItems is 5, meaning that the + // memQueue capacity is 3. Since the feedOutput goroutine has not yet + // started, these first 3 items (tasks 0-2) will fill the memQueue. + enqueue(t, q, tasks[0]) + enqueue(t, q, tasks[1]) + enqueue(t, q, tasks[2]) + + // Adding task 3 is expected to result in the mode being changed to disk + // mode. + enqueue(t, q, tasks[3]) + + // Show that the mode does actually change to disk mode. + err = wait.Predicate(func() bool { + return q.toDisk.Load() + }, waitTime) + require.NoError(t, err) + + // Allow task 3 to be written to disk. This will send a signal on + // newDiskItemsSignal. + q.allowDiskWrite() + + // Task 3 will almost immediately be popped from disk again due to + // the newDiskItemsSignal causing feedMemQueue to call feedFromDisk. + waitForNumDisk(t, db, 0) + + // Enqueue task 4 but don't allow it to be written to disk yet. + enqueue(t, q, tasks[4]) + + // Wait a bit just to make sure that task 4 has passed the + // if q.toDisk.Load() check in pushToActiveQueue and is waiting on the + // allowDiskWrite signal. + time.Sleep(time.Second) + + // Now, start the feedOutput goroutine. This will pop 1 item from the + // memQueue meaning that feedMemQueue will now manage to push an item + // onto the memQueue & will go onto the next iteration of feedFromDisk + // which will then see that there are no items on disk and so will + // change the mode to memory-mode. + q.startFeedOutput() + + err = wait.Predicate(func() bool { + return !q.toDisk.Load() + }, waitTime) + require.NoError(t, err) + + // Now, we allow task 4 to be written to disk. NOTE that this is + // happening while the mode is memory mode! This is the bug! This will + // result in a newDiskItemsSignal being sent meaning that feedMemQueue + // will read from disk and block on pushing the new item to memQueue. + q.allowDiskWrite() + + // Now, if we enqueue task 5, it will _not_ be written to disk since the + // queue is currently in memory mode. This is the bug that needs to be + // fixed. + enqueue(t, q, tasks[5]) + q.allowDiskWrite() + + // Show that there are no items on disk at this point. When the bug is + // fixed, this should be changed to 1. + waitForNumDisk(t, db, 0) + + require.NoError(t, q.Stop()) +} + // testOverflowToDisk is a basic test that ensures that the queue correctly // overflows items to disk and then correctly reloads them. func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { From 3d201dde505ea8109cce3d6a4475f61b349c9df4 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 12 Jan 2024 10:19:33 +0200 Subject: [PATCH 2/3] wtclient: ensure correct disk mode for overflow queue Before this commit, in the watchtower client's DiskOverflowQueue, there could be a situation _if no consumer was reading from the queue_ that could lead to an in-memory build up of backup tasks instead of the expected disk overflow. This commit fixes this by ensuring that if a task is read from disk, then the mode is set to disk mode to ensure that any new items are persisted to disk in this scenario. The unit tests added in the previous commit is updated here in order to show that the fix does in-fact fix the test. --- docs/release-notes/release-notes-0.18.0.md | 5 +++- watchtower/wtclient/queue.go | 7 +++++ watchtower/wtclient/queue_test.go | 30 ++++++++++++---------- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index b3a388052d..4fcda2e4e0 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -49,7 +49,7 @@ same exclusive group](https://github.com/lightningnetwork/lnd/pull/7800). When using neutrino as a backend unconfirmed transactions have to be removed from the wallet when a conflicting tx is confirmed. For other backends - these unconfirmed transactions are already removed. In addition a new + these unconfirmed transactions are already removed. In addition, a new walletrpc endpoint `RemoveTransaction` is introduced which let one easily remove unconfirmed transaction manually. @@ -61,6 +61,9 @@ this buffer which can be used to increase the commitment fee and it also protects against the case where htlcs are added asynchronously resulting in stuck channels. + +* [Fixed](https://github.com/lightningnetwork/lnd/pull/8377) a watchtower client + test flake that prevented new tasks from overflowing to disk. * [Properly handle un-acked updates for exhausted watchtower sessions](https://github.com/lightningnetwork/lnd/pull/8233) diff --git a/watchtower/wtclient/queue.go b/watchtower/wtclient/queue.go index 7f61336b7b..f2e660457e 100644 --- a/watchtower/wtclient/queue.go +++ b/watchtower/wtclient/queue.go @@ -529,6 +529,13 @@ func (q *DiskOverflowQueue[T]) feedMemQueue() { } } + // If we did manage to fetch a task from disk, we make + // sure to set the toDisk mode to true since we may + // block indefinitely while trying to push the tasks to + // the memQueue in which case we want the drainInputList + // goroutine to write any new tasks to disk. + q.toDisk.Store(true) + for i, task := range tasks { select { case q.memQueue <- task: diff --git a/watchtower/wtclient/queue_test.go b/watchtower/wtclient/queue_test.go index 7adb5a9bfd..e87ca14808 100644 --- a/watchtower/wtclient/queue_test.go +++ b/watchtower/wtclient/queue_test.go @@ -75,9 +75,9 @@ func TestDiskOverflowQueue(t *testing.T) { } } -// demoFlake is a temporary test demonstrating race condition that currently -// exists in the DiskOverflowQueue. It contrives a scenario that results in -// the queue being in memory mode when it really should be in disk mode. +// demoFlake is a temporary test demonstrating the fix of a race condition that +// existed in the DiskOverflowQueue. It contrives a scenario that once resulted +// in the queue being in memory mode when it really should be in disk mode. func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { // Generate some backup IDs that we want to add to the queue. tasks := genBackupIDs(10) @@ -147,21 +147,25 @@ func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { }, waitTime) require.NoError(t, err) - // Now, we allow task 4 to be written to disk. NOTE that this is - // happening while the mode is memory mode! This is the bug! This will - // result in a newDiskItemsSignal being sent meaning that feedMemQueue - // will read from disk and block on pushing the new item to memQueue. + // Now, we allow task 4 to be written to disk. This will result in a + // newDiskItemsSignal being sent meaning that feedMemQueue will read + // from disk and block on pushing the new item to memQueue. q.allowDiskWrite() - // Now, if we enqueue task 5, it will _not_ be written to disk since the - // queue is currently in memory mode. This is the bug that needs to be - // fixed. + // The above will result in feeMemQueue switching the mode to disk-mode. + err = wait.Predicate(func() bool { + return q.toDisk.Load() + }, waitTime) + require.NoError(t, err) + + // Now, if we enqueue task 5, it _will_ be written to disk since the + // queue is currently in disk mode. enqueue(t, q, tasks[5]) q.allowDiskWrite() - // Show that there are no items on disk at this point. When the bug is - // fixed, this should be changed to 1. - waitForNumDisk(t, db, 0) + // Show that there is an item on disk at this point. This demonstrates + // that the bug has been fixed. + waitForNumDisk(t, db, 1) require.NoError(t, q.Stop()) } From 65de80be7d21d031dd65fde361c5c8efa02de710 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 15 Jan 2024 16:58:42 +0200 Subject: [PATCH 3/3] wtclient: remove temporary bug demonstration logic and test This commit removes the temporary members added to the DiskOverflowQueue that made it possible to more easily demonstrate a previous bug that existed. --- watchtower/wtclient/queue.go | 81 +++---------------------- watchtower/wtclient/queue_test.go | 99 ------------------------------- 2 files changed, 8 insertions(+), 172 deletions(-) diff --git a/watchtower/wtclient/queue.go b/watchtower/wtclient/queue.go index f2e660457e..8581ea888f 100644 --- a/watchtower/wtclient/queue.go +++ b/watchtower/wtclient/queue.go @@ -92,21 +92,6 @@ type DiskOverflowQueue[T any] struct { // task. leftOverItem3 *T - // waitForDiskWriteSignal is true if blockDiskWrite should be used to - // wait for an explicit signal before writing an item to disk. This is - // for the purpose of testing only and will be removed after the fix of - // a test flake has been demonstrated. - waitForDiskWriteSignal bool - blockDiskWrite chan struct{} - - // waitForFeedOutputSignal is true if startFeedOutputChan should be used - // to wait for an explicit signal before allowing the feedOutput - // goroutine to begin its duties. This is for the purpose of testing - // only and will be removed after the fix of a test flake has been - // demonstrated. - waitForFeedOutputSignal bool - startFeedOutputChan chan struct{} - quit chan struct{} wg sync.WaitGroup } @@ -121,16 +106,14 @@ func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64, } q := &DiskOverflowQueue[T]{ - log: logger, - db: db, - inputList: list.New(), - newDiskItemSignal: make(chan struct{}, 1), - inputChan: make(chan *internalTask[T]), - memQueue: make(chan T, maxQueueSize-2), - outputChan: make(chan T), - blockDiskWrite: make(chan struct{}, 1), - startFeedOutputChan: make(chan struct{}, 1), - quit: make(chan struct{}), + log: logger, + db: db, + inputList: list.New(), + newDiskItemSignal: make(chan struct{}, 1), + inputChan: make(chan *internalTask[T]), + memQueue: make(chan T, maxQueueSize-2), + outputChan: make(chan T), + quit: make(chan struct{}), } q.inputListCond = sync.NewCond(&q.inputListMu) @@ -147,27 +130,6 @@ func (q *DiskOverflowQueue[T]) Start() error { return err } -// allowDiskWrite is used to explicitly signal that a disk write may take place. -// This is for the purposes of testing only and will be removed once a specific -// test flake fix has been demonstrated. -func (q *DiskOverflowQueue[T]) allowDiskWrite() { - select { - case q.blockDiskWrite <- struct{}{}: - default: - } -} - -// startFeedOutput is used to explicitly signal that the feedOutput goroutine -// may start. -// This is for the purposes of testing only and will be removed once a specific -// test flake fix has been demonstrated. -func (q *DiskOverflowQueue[T]) startFeedOutput() { - select { - case q.startFeedOutputChan <- struct{}{}: - default: - } -} - // start kicks off all the goroutines that are required to manage the queue. func (q *DiskOverflowQueue[T]) start() error { numDisk, err := q.db.Len() @@ -383,21 +345,6 @@ func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool { // If the queue is in disk mode then any new items should be put // straight into the disk queue. if q.toDisk.Load() { - - // If waitForDiskWriteSignal is true, then we wait here for - // an explicit signal before writing the item to disk. This is - // for testing only and will be removed once a specific test - // flake fix has been demonstrated. - if q.waitForDiskWriteSignal { - select { - case <-q.blockDiskWrite: - case <-q.quit: - q.leftOverItem3 = &task - - return false - } - } - err := q.db.Push(task) if err != nil { // Log and back off for a few seconds and then @@ -605,18 +552,6 @@ func (q *DiskOverflowQueue[T]) feedOutputChan() { q.wg.Done() }() - // If waitForFeedOutputSignal is true, then we wait here for an explicit - // signal before starting the main loop of the function. This is for - // testing only and will be removed once a specific test flake fix has - // been demonstrated. - if q.waitForFeedOutputSignal { - select { - case <-q.startFeedOutputChan: - case <-q.quit: - return - } - } - for { select { case nextTask, ok := <-q.memQueue: diff --git a/watchtower/wtclient/queue_test.go b/watchtower/wtclient/queue_test.go index e87ca14808..529acb49a4 100644 --- a/watchtower/wtclient/queue_test.go +++ b/watchtower/wtclient/queue_test.go @@ -38,10 +38,6 @@ func TestDiskOverflowQueue(t *testing.T) { name: "start stop queue", run: testStartStopQueue, }, - { - name: "demo flake", - run: demoFlake, - }, } initDB := func() wtdb.Queue[*wtdb.BackupID] { @@ -75,101 +71,6 @@ func TestDiskOverflowQueue(t *testing.T) { } } -// demoFlake is a temporary test demonstrating the fix of a race condition that -// existed in the DiskOverflowQueue. It contrives a scenario that once resulted -// in the queue being in memory mode when it really should be in disk mode. -func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { - // Generate some backup IDs that we want to add to the queue. - tasks := genBackupIDs(10) - - // New mock logger. - log := newMockLogger(t.Logf) - - // Init the queue with the mock DB. - q, err := NewDiskOverflowQueue[*wtdb.BackupID]( - db, maxInMemItems, log, - ) - require.NoError(t, err) - - // Set the two test variables to true so that we have more control over - // the feedOutput goroutine and disk writes from the test. - q.waitForDiskWriteSignal = true - q.waitForFeedOutputSignal = true - - // Start the queue. - require.NoError(t, q.Start()) - - // Initially there should be no items on disk. - assertNumDisk(t, db, 0) - - // Start filling up the queue. The maxInMemItems is 5, meaning that the - // memQueue capacity is 3. Since the feedOutput goroutine has not yet - // started, these first 3 items (tasks 0-2) will fill the memQueue. - enqueue(t, q, tasks[0]) - enqueue(t, q, tasks[1]) - enqueue(t, q, tasks[2]) - - // Adding task 3 is expected to result in the mode being changed to disk - // mode. - enqueue(t, q, tasks[3]) - - // Show that the mode does actually change to disk mode. - err = wait.Predicate(func() bool { - return q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Allow task 3 to be written to disk. This will send a signal on - // newDiskItemsSignal. - q.allowDiskWrite() - - // Task 3 will almost immediately be popped from disk again due to - // the newDiskItemsSignal causing feedMemQueue to call feedFromDisk. - waitForNumDisk(t, db, 0) - - // Enqueue task 4 but don't allow it to be written to disk yet. - enqueue(t, q, tasks[4]) - - // Wait a bit just to make sure that task 4 has passed the - // if q.toDisk.Load() check in pushToActiveQueue and is waiting on the - // allowDiskWrite signal. - time.Sleep(time.Second) - - // Now, start the feedOutput goroutine. This will pop 1 item from the - // memQueue meaning that feedMemQueue will now manage to push an item - // onto the memQueue & will go onto the next iteration of feedFromDisk - // which will then see that there are no items on disk and so will - // change the mode to memory-mode. - q.startFeedOutput() - - err = wait.Predicate(func() bool { - return !q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Now, we allow task 4 to be written to disk. This will result in a - // newDiskItemsSignal being sent meaning that feedMemQueue will read - // from disk and block on pushing the new item to memQueue. - q.allowDiskWrite() - - // The above will result in feeMemQueue switching the mode to disk-mode. - err = wait.Predicate(func() bool { - return q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Now, if we enqueue task 5, it _will_ be written to disk since the - // queue is currently in disk mode. - enqueue(t, q, tasks[5]) - q.allowDiskWrite() - - // Show that there is an item on disk at this point. This demonstrates - // that the bug has been fixed. - waitForNumDisk(t, db, 1) - - require.NoError(t, q.Stop()) -} - // testOverflowToDisk is a basic test that ensures that the queue correctly // overflows items to disk and then correctly reloads them. func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {