Skip to content

Commit

Permalink
test: add failing test to ensure heartbeat writer is indefinitely stu…
Browse files Browse the repository at this point in the history
…ck being closed if blocked on a semi-sync ack

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Dec 19, 2023
1 parent 91bb0b9 commit 85c278a
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package repltracker

import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -65,6 +67,48 @@ func TestWriteHeartbeatError(t *testing.T) {
assert.Equal(t, int64(1), writeErrors.Get())
}

// TestCloseWhileStuckWriting tests that Close shouldn't get stuck even if the heartbeat writer is stuck waiting for a semi-sync ACK.
func TestCloseWhileStuckWriting(t *testing.T) {
db := fakesqldb.New(t)
tw := newTestWriter(db, nil)
tw.isOpen = true

killWg := sync.WaitGroup{}
killWg.Add(1)
startedWaitWg := sync.WaitGroup{}
startedWaitWg.Add(1)

// Insert a query pattern that causes the upsert to block indefinitely until it has been killed.
// This simulates a stuck primary write due to a semi-sync ACK requirement.
db.AddQueryPatternWithCallback(`INSERT INTO .*heartbeat \(ts, tabletUid, keyspaceShard\).*`, &sqltypes.Result{}, func(s string) {
startedWaitWg.Done()
killWg.Wait()
})

// When we receive a kill query, we want to finish running the wait group to unblock the upsert query.
db.AddQueryPatternWithCallback("kill", &sqltypes.Result{}, func(s string) {
killWg.Done()
})

// Now we enable writes, but the first write will get blocked.
tw.enableWrites(true)
// We wait until the write has blocked to ensure we only call Close after we are stuck writing.
startedWaitWg.Wait()
// Even if the write is blocked, we should be able to disable writes without waiting indefinitely.
// This is what we call, when we try to Close the heartbeat writer.
ctx, cancel := context.WithCancel(context.Background())
go func() {
tw.enableWrites(false)
cancel()
}()
select {
case <-ctx.Done():
db.Close()
case <-time.After(10 * time.Second):
t.Fatalf("Timed out waiting for heartbeat writer to close")
}
}

func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter {
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
Expand Down

0 comments on commit 85c278a

Please sign in to comment.