Skip to content

Commit

Permalink
change ignore list to variadic arg, create new child context with sho…
Browse files Browse the repository at this point in the history
…rter timeout

Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Oct 16, 2023
1 parent a1f1af9 commit c052bc2
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 47 deletions.
12 changes: 7 additions & 5 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math/rand"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -135,7 +136,7 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
ignoreTablets map[string]*topodatapb.TabletAlias
ignoreTablets []string
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -145,7 +146,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets map[string]*topodatapb.TabletAlias,
ignoreTablets ...string,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -419,7 +420,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn

tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
tabletAliasString := topoproto.TabletAliasString(tabletAlias)
tabletInfo, ok := tabletMap[tabletAliasString]
if !ok {
// Either tablet disappeared on us, or we got a partial result
// (GetTabletMap ignores topo.ErrNoNode); just log a warning.
Expand All @@ -436,8 +438,8 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
// if this tablet is not in the ignore list, then add it as a candidate
if _, ok := tp.ignoreTablets[tabletInfo.GetAlias().String()]; !ok {
// If this tablet is not in the ignore list, then add it as a candidate.
if !slices.Contains(tp.ignoreTablets, tabletAliasString) {
tablets = append(tablets, tabletInfo)
}
}
Expand Down
33 changes: 15 additions & 18 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -47,7 +48,7 @@ func TestPickPrimary(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{})
require.NoError(t, err)

ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
Expand Down Expand Up @@ -284,7 +285,7 @@ func TestPickLocalPreferences(t *testing.T) {
deleteTablet(t, te, tab)
}
}()
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options)
require.NoError(t, err)
require.Equal(t, tp.localCellInfo.localCell, tcase.localCell)
require.ElementsMatch(t, tp.cells, tcase.tpCells)
Expand Down Expand Up @@ -313,7 +314,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) {
defer deleteTablet(t, te, want1)

// Local cell preference is default
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) {

// test env puts all cells into an alias called "cella"
te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
require.NoError(t, err)

// create a tablet in the other cell, it should be picked
Expand All @@ -370,7 +371,7 @@ func TestPickUsingCellAsAlias(t *testing.T) {
// added to the alias.
te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell")
// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{})
require.NoError(t, err)

// Create a tablet in one of the main cells, it should be
Expand Down Expand Up @@ -399,14 +400,11 @@ func TestPickWithIgnoreList(t *testing.T) {
want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, want)

noWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, noWant)

ignoreTablets := make(map[string]*topodatapb.TabletAlias)
ignoreTablets[noWant.Alias.String()] = noWant.GetAlias()
dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, dontWant)

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets)
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias()))
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list
Expand All @@ -425,7 +423,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -468,7 +466,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

te := newPickerTestEnv(t, ctx, []string{"cell"})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
require.NoError(t, err)

delay := GetTabletPickerRetryDelay()
Expand Down Expand Up @@ -498,11 +496,10 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell"})
var ignoreTablets map[string]*topodatapb.TabletAlias
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets)
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{})
assert.EqualError(t, err, "failed to parse list of tablet types: badtype")

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets)
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -530,7 +527,7 @@ func TestPickErrorOnlySpecified(t *testing.T) {

te := newPickerTestEnv(t, ctx, []string{"cell"})

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -586,7 +583,7 @@ func TestPickFallbackType(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]*topodatapb.TabletAlias))
tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second)
defer cancel2()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf
if cells == nil {
cells = append(cells, shard.PrimaryAlias.Cell)
}
tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, ts.ws.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
allErrors.RecordError(err)
return
Expand Down
30 changes: 20 additions & 10 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -474,7 +475,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make(map[string]*topodatapb.TabletAlias)
ignoreTablets := []string{}

errCount := 0
for {
Expand All @@ -492,12 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets)

// Create a child context with a stricter timeout.
tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer tpCancel()

tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
if err != nil {
log.Errorf(err.Error())
return err
}
tablet, err := tp.PickForStreaming(ctx)
tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -673,16 +679,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}

retry, ignoreTablet := vs.isRetriableError(err)
retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
ignoreTablets[tablet.Alias.String()] = tablet.GetAlias()
ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias()))
}

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
Expand All @@ -691,20 +698,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

// isRetriable determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error is retriable, the second indicates whether
// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, the second indicates whether
// the tablet on which the error occurred should be ommitted from the candidate list of tablets
// to choose from on the retry.
func (vs *vstream) isRetriableError(err error) (bool, bool) {
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet or if the tablet cannot be found,
// If there is a GTIDSet Mismatch on the tablet,
// omit it from the candidate list in the TabletPicker on retry.
if (errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch")) || errCode == vtrpcpb.Code_NOT_FOUND {
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}

Expand Down
7 changes: 0 additions & 7 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,6 @@ func TestVStreamRetriableErrors(t *testing.T) {
shouldRetry: true,
ignoreTablet: true,
},
{
name: "not found",
code: vtrpcpb.Code_NOT_FOUND,
msg: "",
shouldRetry: true,
ignoreTablet: true,
},
{
name: "unavailable",
code: vtrpcpb.Code_UNAVAILABLE,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodata.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
return nil, err
}
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra
if cells == nil {
cells = append(cells, shard.PrimaryAlias.Cell)
}
tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
allErrors.RecordError(err)
return
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
if ts.ExternalTopo() != nil {
sourceTopo = ts.ExternalTopo()
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return err
}
Expand All @@ -828,7 +828,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
go func() {
defer wg.Done()
err2 = df.forAll(df.targets, func(shard string, target *shardStreamer) error {
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]*topodatapb.TabletAlias))
tp, err := discovery.NewTabletPicker(ctx, df.ts.TopoServer(), []string{df.targetCell}, df.targetCell, df.ts.TargetKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion tools/rowlog/rowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st
discovery.TabletPickerOptions{
CellPreference: "OnlySpecified",
},
make(map[string]*topodatapb.TabletAlias),
)
if err != nil {
return ""
Expand Down

0 comments on commit c052bc2

Please sign in to comment.