Skip to content

Commit

Permalink
Fix flaky test TestShardSync (vitessio#17095)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 30, 2024
1 parent 0beb273 commit 9c9a893
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions go/vt/vttablet/tabletmanager/shard_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tabletmanager

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
Expand All @@ -44,7 +45,7 @@ const (
)

func TestShardSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
ts := memorytopo.NewServer(ctx, "cell1")
statsTabletTypeCount.ResetAll()
Expand All @@ -67,48 +68,77 @@ func TestShardSync(t *testing.T) {
// wait for syncing to work correctly
// this should also have updated the shard record since it is a more recent operation
// We check here that the shard record and the tablet record are in sync
checkShardRecordWithTimeout(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime, 1*time.Second)
err = checkShardRecord(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime)
require.NoError(t, err)

// Shard sync loop runs asynchronously and starts a watch on the shard.
// We wait for the shard watch to start, otherwise the test is flaky
// because the update of the record can happen before the watch is started.
waitForShardWatchToStart(ctx, t, tm, originalTime, ti)

// even if try to update the shard record with the old timestamp, it should be reverted again
updatePrimaryInfoInShardRecord(ctx, t, tm, nil, originalTime)

// this should have also updated the shard record because of the timestamp.
checkShardRecordWithTimeout(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime, 1*time.Second)
err = checkShardRecord(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime)
require.NoError(t, err)

// updating the shard record with the latest time should trigger an update in the tablet
newTime := time.Now()
updatePrimaryInfoInShardRecord(ctx, t, tm, nil, newTime)

// this should not have updated.
checkShardRecordWithTimeout(ctx, t, ts, nil, protoutil.TimeToProto(newTime), 1*time.Second)
err = checkShardRecord(ctx, t, ts, nil, protoutil.TimeToProto(newTime))
require.NoError(t, err)

// verify that the tablet record has been updated
checkTabletRecordWithTimeout(ctx, t, ts, tm.tabletAlias, topodata.TabletType_REPLICA, nil, 1*time.Second)
checkTabletRecordWithTimeout(ctx, t, ts, tm.tabletAlias, topodata.TabletType_REPLICA, nil)
}

// waitForShardWatchToStart waits for shard watch to have started.
func waitForShardWatchToStart(ctx context.Context, t *testing.T, tm *TabletManager, originalTime time.Time, ti *topo.TabletInfo) {
// We wait for shard watch to start by
// updating the record and waiting to see
// the shard record is updated back by the tablet manager.
idx := 1
for {
select {
case <-ctx.Done():
require.FailNow(t, "timed out: waiting for shard watch to start")
default:
updatePrimaryInfoInShardRecord(ctx, t, tm, nil, originalTime.Add(-1*time.Duration(idx)*time.Second))
idx = idx + 1
checkCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
err := checkShardRecord(checkCtx, t, tm.TopoServer, ti.Alias, ti.PrimaryTermStartTime)
cancel()
if err == nil {
return
}
}
}
}

func checkShardRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, expectedStartTime *vttime.Time, timeToWait time.Duration) {
timeOut := time.After(timeToWait)
func checkShardRecord(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, expectedStartTime *vttime.Time) error {
for {
select {
case <-timeOut:
t.Fatalf("timed out: waiting for shard record to update")
case <-ctx.Done():
return errors.New("timed out: waiting for shard record to update")
default:
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
if reflect.DeepEqual(tabletAlias, si.PrimaryAlias) && reflect.DeepEqual(expectedStartTime, si.PrimaryTermStartTime) {
return
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
}

func checkTabletRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, tabletType topodata.TabletType, expectedStartTime *vttime.Time, timeToWait time.Duration) {
timeOut := time.After(timeToWait)
func checkTabletRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, tabletType topodata.TabletType, expectedStartTime *vttime.Time) {
for {
select {
case <-timeOut:
t.Fatalf("timed out: waiting for tablet record to update")
case <-ctx.Done():
require.FailNow(t, "timed out: waiting for tablet record to update")
default:
ti, err := ts.GetTablet(ctx, tabletAlias)
require.NoError(t, err)
Expand Down

0 comments on commit 9c9a893

Please sign in to comment.