diff --git a/go/vt/vttablet/tabletserver/repltracker/writer_test.go b/go/vt/vttablet/tabletserver/repltracker/writer_test.go index fade8918114..d9957940ea8 100644 --- a/go/vt/vttablet/tabletserver/repltracker/writer_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/writer_test.go @@ -17,7 +17,9 @@ limitations under the License. package repltracker import ( + "context" "fmt" + "sync" "testing" "time" @@ -65,6 +67,48 @@ func TestWriteHeartbeatError(t *testing.T) { assert.Equal(t, int64(1), writeErrors.Get()) } +// TestCloseWhileStuckWriting tests that Close shouldn't get stuck even if the heartbeat writer is stuck waiting for a semi-sync ACK. +func TestCloseWhileStuckWriting(t *testing.T) { + db := fakesqldb.New(t) + tw := newTestWriter(db, nil) + tw.isOpen = true + + killWg := sync.WaitGroup{} + killWg.Add(1) + startedWaitWg := sync.WaitGroup{} + startedWaitWg.Add(1) + + // Insert a query pattern that causes the upsert to block indefinitely until it has been killed. + // This simulates a stuck primary write due to a semi-sync ACK requirement. + db.AddQueryPatternWithCallback(`INSERT INTO .*heartbeat \(ts, tabletUid, keyspaceShard\).*`, &sqltypes.Result{}, func(s string) { + startedWaitWg.Done() + killWg.Wait() + }) + + // When we receive a kill query, we want to finish running the wait group to unblock the upsert query. + db.AddQueryPatternWithCallback("kill", &sqltypes.Result{}, func(s string) { + killWg.Done() + }) + + // Now we enable writes, but the first write will get blocked. + tw.enableWrites(true) + // We wait until the write has blocked to ensure we only call Close after we are stuck writing. + startedWaitWg.Wait() + // Even if the write is blocked, we should be able to disable writes without waiting indefinitely. + // This is what we call, when we try to Close the heartbeat writer. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + tw.enableWrites(false) + cancel() + }() + select { + case <-ctx.Done(): + db.Close() + case <-time.After(10 * time.Second): + t.Fatalf("Timed out waiting for heartbeat writer to close") + } +} + func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter { config := tabletenv.NewDefaultConfig() config.ReplicationTracker.Mode = tabletenv.Heartbeat