diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 99d95848d19..272d765a69e 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -135,6 +135,7 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo + ignoreTablets map[string]topodatapb.TabletAlias } // NewTabletPicker returns a TabletPicker. @@ -144,6 +145,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, + ignoreTablets map[string]topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -228,6 +230,7 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, + ignoreTablets: ignoreTablets, }, nil } @@ -433,7 +436,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - tablets = append(tablets, tabletInfo) + // if this tablet is not in the ignore list, then add it as a candidate + if _, ok := tp.ignoreTablets[tabletInfo.Alias.String()]; !ok { + tablets = append(tablets, tabletInfo) + } } _ = conn.Close(ctx) } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 2999c251e93..b633e8cc2ca 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -47,7 +47,7 @@ func TestPickPrimary(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond) @@ -284,7 +284,7 @@ func TestPickLocalPreferences(t *testing.T) { deleteTablet(t, te, tab) } }() - tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options) + tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) require.Equal(t, tp.localCellInfo.localCell, tcase.localCell) require.ElementsMatch(t, tp.cells, tcase.tpCells) @@ -313,7 +313,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) { defer deleteTablet(t, te, want1) // Local cell preference is default - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -348,7 +348,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) // create a tablet in the other cell, it should be picked @@ -370,7 +370,7 @@ func TestPickUsingCellAsAlias(t *testing.T) { // added to the alias. te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell") // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) // Create a tablet in one of the main cells, it should be @@ -399,7 +399,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) { want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) defer deleteTablet(t, te, want1) - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) tablet, err := tp.PickForStreaming(ctx) @@ -442,7 +442,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() @@ -472,10 +472,11 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) { ctx := utils.LeakCheckContext(t) te := newPickerTestEnv(t, ctx, []string{"cell"}) - _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}) + var ignoreTablets map[string]topodatapb.TabletAlias + _, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets) assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -503,7 +504,7 @@ func TestPickErrorOnlySpecified(t *testing.T) { te := newPickerTestEnv(t, ctx, []string{"cell"}) - tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}) + tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) delay := GetTabletPickerRetryDelay() defer func() { @@ -559,7 +560,7 @@ func TestPickFallbackType(t *testing.T) { }) require.NoError(t, err) - tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]topodatapb.TabletAlias)) require.NoError(t, err) ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second) defer cancel2() diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 38706a8fbee..bf9401b5a83 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -473,6 +473,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} + var ignoreTablets map[string]topodatapb.TabletAlias errCount := 0 for { @@ -490,7 +491,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets) if err != nil { log.Errorf(err.Error()) return err @@ -670,10 +671,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE { + if !vs.isRetriableError(err) { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } + ignoreTablets[tablet.Alias.String()] = *tablet.GetAlias() errCount++ if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) @@ -683,6 +685,20 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +func (vs *vstream) isRetriableError(err error) bool { + errCode := vterrors.Code(err) + + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND { + return true + } + + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.HasPrefix(err.Error(), "GTIDSet Mismatch") { + return true + } + + return false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index e65a0bad253..71ef5e29d87 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error { } func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) { - tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodata.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 94e4741eeee..871e3d69c64 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { return nil, err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 654a5bd1588..90ef7116035 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra if cells == nil { cells = append(cells, shard.PrimaryAlias.Cell) } - tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { allErrors.RecordError(err) return diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 85c82bb3574..9a146e629d0 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error { if ts.ExternalTopo() != nil { sourceTopo = ts.ExternalTopo() } - tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}) + tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias)) if err != nil { return err } diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index 475006b2b59..92045fcd245 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -389,6 +389,7 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st discovery.TabletPickerOptions{ CellPreference: "OnlySpecified", }, + make(map[string]topodatapb.TabletAlias), ) if err != nil { return ""