From c052bc273280439e8ceb0ab39325f0b38920de9c Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Mon, 16 Oct 2023 16:13:41 -0700 Subject: [PATCH] change ignore list to variadic arg, create new child context with shorter timeout Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 12 ++++--- go/vt/discovery/tablet_picker_test.go | 33 +++++++++---------- go/vt/vtctl/workflow/utils.go | 2 +- go/vt/vtgate/vstream_manager.go | 30 +++++++++++------ go/vt/vtgate/vstream_manager_test.go | 7 ---- .../tabletmanager/vdiff/table_differ.go | 2 +- .../tabletmanager/vreplication/controller.go | 2 +- go/vt/wrangler/traffic_switcher.go | 2 +- go/vt/wrangler/vdiff.go | 4 +-- tools/rowlog/rowlog.go | 1 - 10 files changed, 48 insertions(+), 47 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 6eaebdcac1b..b95e08120f2 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math/rand" + "slices" "sort" "strings" "sync" @@ -135,7 +136,7 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo - ignoreTablets map[string]*topodatapb.TabletAlias + ignoreTablets []string } // NewTabletPicker returns a TabletPicker. @@ -145,7 +146,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, - ignoreTablets map[string]*topodatapb.TabletAlias, + ignoreTablets ...string, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -419,7 +420,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { - tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] + tabletAliasString := topoproto.TabletAliasString(tabletAlias) + tabletInfo, ok := tabletMap[tabletAliasString] if !ok { // Either tablet disappeared on us, or we got a partial result // (GetTabletMap ignores topo.ErrNoNode); just log a warning. @@ -436,8 +438,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - // if this tablet is not in the ignore list, then add it as a candidate - if _, ok := tp.ignoreTablets[tabletInfo.GetAlias().String()]; !ok { + // If this tablet is not in the ignore list, then add it as a candidate. + if !slices.Contains(tp.ignoreTablets, tabletAliasString) { tablets = append(tablets, tabletInfo) } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 0e8d50edee9..9191d161626 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topo/topoproto" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -47,7 +48,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) @@ -284,7 +285,7 @@ func TestPickLocalPreferences(t *testing.T) { deleteTablet(t, te, tab) } }() - tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) require.NoError(t, err) require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) require.ElementsMatch(t, tp.cells, tcase.tpCells) @@ -313,7 +314,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) { defer deleteTablet(t, te, want1) // Local cell preference is default - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -348,7 +349,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) // create a tablet in the other cell, it should be picked @@ -370,7 +371,7 @@ func TestPickUsingCellAsAlias(t *testing.T) { // added to the alias. te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) // Create a tablet in one of the main cells, it should be @@ -399,14 +400,11 @@ func TestPickWithIgnoreList(t *testing.T) { want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) defer deleteTablet(t, te, want) - noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) - defer deleteTablet(t, te, noWant) - - ignoreTablets := make(map[string]*topodatapb.TabletAlias) - ignoreTablets[noWant.Alias.String()] = noWant.GetAlias() + dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, dontWant) // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias())) require.NoError(t, err) // Try it many times to be sure we don't ever pick from the ignore list @@ -425,7 +423,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) { want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -468,7 +466,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -498,11 +496,10 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) { ctx := utils.LeakCheckContext(t) te := newPickerTestEnv(t, ctx, []string{"cell"}) - var ignoreTablets map[string]*topodatapb.TabletAlias - _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets) + _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -530,7 +527,7 @@ func TestPickErrorOnlySpecified(t *testing.T) { te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -586,7 +583,7 @@ func TestPickFallbackType(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]*topodatapb.TabletAlias)) + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) defer cancel2() diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 1c5c07d90fb..1a723c6192c 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index c8694606ec2..734dfaf6f2e 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -30,6 +30,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -474,7 +475,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} - ignoreTablets := make(map[string]*topodatapb.TabletAlias) + ignoreTablets := []string{} errCount := 0 for { @@ -492,12 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets) + + // Create a child context with a stricter timeout. + tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer tpCancel() + + tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } - tablet, err := tp.PickForStreaming(ctx) + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) return err @@ -673,16 +679,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - retry, ignoreTablet := vs.isRetriableError(err) + retry, ignoreTablet := vs.shouldRetry(err) if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } if ignoreTablet { - ignoreTablets[tablet.Alias.String()] = tablet.GetAlias() + ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias())) } errCount++ + // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return err @@ -691,20 +698,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } -// isRetriable determines whether we should exit immediately or retry the vstream. -// The first return value determines if the error is retriable, the second indicates whether +// shouldRetry determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error can be retried, the second indicates whether // the tablet on which the error occurred should be ommitted from the candidate list of tablets // to choose from on the retry. -func (vs *vstream) isRetriableError(err error) (bool, bool) { +// +// An error should be retried if it is expected to be transient. +// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error. +func (vs *vstream) shouldRetry(err error) (bool, bool) { errCode := vterrors.Code(err) if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { return true, false } - // If there is a GTIDSet Mismatch on the tablet or if the tablet cannot be found, + // If there is a GTIDSet Mismatch on the tablet, // omit it from the candidate list in the TabletPicker on retry. - if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND { + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { return true, true } diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 74c975e58a6..55575be1db4 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -410,13 +410,6 @@ func TestVStreamRetriableErrors(t *testing.T) { shouldRetry: true, ignoreTablet: true, }, - { - name: "not found", - code: vtrpcpb.Code_NOT_FOUND, - msg: "", - shouldRetry: true, - ignoreTablet: true, - }, { name: "unavailable", code: vtrpcpb.Code_UNAVAILABLE, diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e2cad685e2c..e65a0bad253 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodata.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7c72243ab2b..94e4741eeee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return nil, err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index a2ffa1a1675..654a5bd1588 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index c18e51cb93e..85c82bb3574 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } @@ -828,7 +828,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { go func() { defer wg.Done() err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error { - tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias)) + tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) if err != nil { return err } diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 7e11ba0886e..475006b2b59 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -389,7 +389,6 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st discovery.TabletPickerOptions{ CellPreference: "OnlySpecified", }, - make(map[string]*topodatapb.TabletAlias), ) if err != nil { return ""