diff --git a/go/vt/vttablet/tabletmanager/shard_sync_test.go b/go/vt/vttablet/tabletmanager/shard_sync_test.go index 40a62d06b76..8f7739e5341 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync_test.go +++ b/go/vt/vttablet/tabletmanager/shard_sync_test.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "errors" "fmt" "reflect" "testing" @@ -44,7 +45,7 @@ const ( ) func TestShardSync(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() ts := memorytopo.NewServer(ctx, "cell1") statsTabletTypeCount.ResetAll() @@ -67,48 +68,77 @@ func TestShardSync(t *testing.T) { // wait for syncing to work correctly // this should also have updated the shard record since it is a more recent operation // We check here that the shard record and the tablet record are in sync - checkShardRecordWithTimeout(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime, 1*time.Second) + err = checkShardRecord(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime) + require.NoError(t, err) + + // Shard sync loop runs asynchronously and starts a watch on the shard. + // We wait for the shard watch to start, otherwise the test is flaky + // because the update of the record can happen before the watch is started. + waitForShardWatchToStart(ctx, t, tm, originalTime, ti) // even if try to update the shard record with the old timestamp, it should be reverted again updatePrimaryInfoInShardRecord(ctx, t, tm, nil, originalTime) // this should have also updated the shard record because of the timestamp. - checkShardRecordWithTimeout(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime, 1*time.Second) + err = checkShardRecord(ctx, t, ts, ti.Alias, ti.PrimaryTermStartTime) + require.NoError(t, err) // updating the shard record with the latest time should trigger an update in the tablet newTime := time.Now() updatePrimaryInfoInShardRecord(ctx, t, tm, nil, newTime) // this should not have updated. - checkShardRecordWithTimeout(ctx, t, ts, nil, protoutil.TimeToProto(newTime), 1*time.Second) + err = checkShardRecord(ctx, t, ts, nil, protoutil.TimeToProto(newTime)) + require.NoError(t, err) // verify that the tablet record has been updated - checkTabletRecordWithTimeout(ctx, t, ts, tm.tabletAlias, topodata.TabletType_REPLICA, nil, 1*time.Second) + checkTabletRecordWithTimeout(ctx, t, ts, tm.tabletAlias, topodata.TabletType_REPLICA, nil) +} + +// waitForShardWatchToStart waits for shard watch to have started. +func waitForShardWatchToStart(ctx context.Context, t *testing.T, tm *TabletManager, originalTime time.Time, ti *topo.TabletInfo) { + // We wait for shard watch to start by + // updating the record and waiting to see + // the shard record is updated back by the tablet manager. + idx := 1 + for { + select { + case <-ctx.Done(): + require.FailNow(t, "timed out: waiting for shard watch to start") + default: + updatePrimaryInfoInShardRecord(ctx, t, tm, nil, originalTime.Add(-1*time.Duration(idx)*time.Second)) + idx = idx + 1 + checkCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + err := checkShardRecord(checkCtx, t, tm.TopoServer, ti.Alias, ti.PrimaryTermStartTime) + cancel() + if err == nil { + return + } + } + } } -func checkShardRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, expectedStartTime *vttime.Time, timeToWait time.Duration) { - timeOut := time.After(timeToWait) +func checkShardRecord(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, expectedStartTime *vttime.Time) error { for { select { - case <-timeOut: - t.Fatalf("timed out: waiting for shard record to update") + case <-ctx.Done(): + return errors.New("timed out: waiting for shard record to update") default: si, err := ts.GetShard(ctx, keyspace, shard) require.NoError(t, err) if reflect.DeepEqual(tabletAlias, si.PrimaryAlias) && reflect.DeepEqual(expectedStartTime, si.PrimaryTermStartTime) { - return + return nil } time.Sleep(100 * time.Millisecond) } } } -func checkTabletRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, tabletType topodata.TabletType, expectedStartTime *vttime.Time, timeToWait time.Duration) { - timeOut := time.After(timeToWait) +func checkTabletRecordWithTimeout(ctx context.Context, t *testing.T, ts *topo.Server, tabletAlias *topodata.TabletAlias, tabletType topodata.TabletType, expectedStartTime *vttime.Time) { for { select { - case <-timeOut: - t.Fatalf("timed out: waiting for tablet record to update") + case <-ctx.Done(): + require.FailNow(t, "timed out: waiting for tablet record to update") default: ti, err := ts.GetTablet(ctx, tabletAlias) require.NoError(t, err)