From 0f21a0bf5b1c69a46bb596f04f58019f5607d893 Mon Sep 17 00:00:00 2001 From: pbibra Date: Mon, 23 Oct 2023 12:42:32 -0700 Subject: [PATCH] allow tablet picker to exclude specified tablets from its candidate list (#14224) Signed-off-by: Priya Bibra --- go/vt/discovery/tablet_picker.go | 23 +++- go/vt/discovery/tablet_picker_test.go | 23 ++++ go/vt/vtgate/vstream_manager.go | 58 +++++++-- go/vt/vtgate/vstream_manager_test.go | 171 +++++++++++++++++++++----- 4 files changed, 230 insertions(+), 45 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 99d95848d19..e5177d81f3f 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -135,6 +135,8 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo + // This map is keyed on the results of TabletAlias.String(). + ignoreTablets map[string]struct{} } // NewTabletPicker returns a TabletPicker. @@ -144,6 +146,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, + ignoreTablets ...*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -219,7 +222,7 @@ func NewTabletPicker( } } - return &TabletPicker{ + tp := &TabletPicker{ ts: ts, cells: dedupeCells(cells), localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, @@ -228,7 +231,15 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, - }, nil + ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + } + + for _, ignoreTablet := range ignoreTablets { + tp.ignoreTablets[ignoreTablet.String()] = struct{}{} + } + + return tp, nil + } // dedupeCells is used to remove duplicates in the cell list in case it is passed in @@ -358,7 +369,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } - aliases = append(aliases, si.PrimaryAlias) + if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { + aliases = append(aliases, si.PrimaryAlias) + } } else { actualCells := make([]string, 0) for _, cell := range tp.cells { @@ -394,7 +407,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn continue } for _, node := range sri.Nodes { - aliases = append(aliases, node.TabletAlias) + if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { + aliases = append(aliases, node.TabletAlias) + } } } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 2999c251e93..ac822124d58 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -391,6 +391,29 @@ func TestPickUsingCellAsAlias(t *testing.T) { } } +func TestPickWithIgnoreList(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"}) + + want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, want) + + dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true) + defer deleteTablet(t, te, dontWant) + + // Specify the alias as the cell. + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias()) + require.NoError(t, err) + + // Try it many times to be sure we don't ever pick from the ignore list. + for i := 0; i < 100; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant) + } +} + func TestPickUsingCellAliasOnlySpecified(t *testing.T) { ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 38706a8fbee..ffb8989ca5d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,17 +27,17 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. @@ -53,6 +53,10 @@ type vstreamManager struct { // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set const maxSkewTimeoutSeconds = 10 * 60 +// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets +// for a vstream +const tabletPickerContextTimeout = 90 * time.Second + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -124,6 +128,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + return &vstreamManager{ resolver: resolver, toposerv: serv, @@ -473,6 +478,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} + ignoreTablets := make([]*topodatapb.TabletAlias, 0) errCount := 0 for { @@ -490,12 +496,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions) + + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } - tablet, err := tp.PickForStreaming(ctx) + + // Create a child context with a stricter timeout when picking a tablet. + // This will prevent hanging in the case no tablets are found. + tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) + defer tpCancel() + + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) return err @@ -670,11 +683,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly") } - if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE { + + retry, ignoreTablet := vs.shouldRetry(err) + if !retry { log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) return err } + if ignoreTablet { + ignoreTablets = append(ignoreTablets, tablet.GetAlias()) + } + errCount++ + // Retry, at most, 3 times if the error can be retried. if errCount >= 3 { log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err) return err @@ -683,6 +703,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +// shouldRetry determines whether we should exit immediately or retry the vstream. +// The first return value determines if the error can be retried, while the second +// indicates whether the tablet with which the error occurred should be ommitted +// from the candidate list of tablets to choose from on the retry. +// +// An error should be retried if it is expected to be transient. +// A tablet should be ignored upon retry if it's likely another tablet will not +// produce the same error. +func (vs *vstream) shouldRetry(err error) (bool, bool) { + errCode := vterrors.Code(err) + + if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE { + return true, false + } + + // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate + // list in the TabletPicker on retry. + if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { + return true, true + } + + return false, false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 3018791964f..4c1e9ec6764 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -386,46 +386,133 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") } -func TestVStreamRetry(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestVStreamRetriableErrors(t *testing.T) { + type testCase struct { + name string + code vtrpcpb.Code + msg string + shouldRetry bool + ignoreTablet bool + } - cell := "aa" - ks := "TestVStream" - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) + tcases := []testCase{ + { + name: "failed precondition", + code: vtrpcpb.Code_FAILED_PRECONDITION, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "gtid mismatch", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "GTIDSet Mismatch aa", + shouldRetry: true, + ignoreTablet: true, + }, + { + name: "unavailable", + code: vtrpcpb.Code_UNAVAILABLE, + msg: "", + shouldRetry: true, + ignoreTablet: false, + }, + { + name: "should not retry", + code: vtrpcpb.Code_INVALID_ARGUMENT, + msg: "final error", + shouldRetry: false, + ignoreTablet: false, + }, + } - st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(ctx, hc, st, "aa") - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) - addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_COMMIT}, } - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) - sbc0.AddVStreamEvents(commit, nil) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) - sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) - var count atomic.Int32 - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: ks, - Shard: "-20", - Gtid: "pos", - }}, - } - err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - count.Add(1) - return nil - }) - wantErr := "final error" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + + want := &binlogdatapb.VStreamResponse{Events: commit} + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // aa will be the local cell for this test, but that tablet will have a vstream error. + cells := []string{"aa", "ab"} + + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopoMultiCell(ctx, cells, ks, []string{"-20"}) + + sbc0 := hc.AddTestTablet(cells[0], "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet(cells[1], "1.1.1.1", 1002, ks, "-20", topodatapb.TabletType_REPLICA, true, 1, nil) + + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc1.Tablet()) + + vsm := newTestVStreamManager(ctx, hc, st, cells[0]) + + // Always have the local cell tablet error so it's ignored on retry and we pick the other one + // if the error requires ignoring the tablet on retry. + sbc0.AddVStreamEvents(nil, vterrors.Errorf(tcase.code, tcase.msg)) + + if tcase.ignoreTablet { + sbc1.AddVStreamEvents(commit, nil) + } else { + sbc0.AddVStreamEvents(commit, nil) + } + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }}, + } + + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + wantErr := "context canceled" + + if !tcase.shouldRetry { + wantErr = tcase.msg + } + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + + Loop: + for { + if tcase.shouldRetry { + select { + case event := <-ch: + got := event.CloneVT() + if !proto.Equal(got, want) { + t.Errorf("got different vstream event than expected") + } + cancel() + case <-done: + // The goroutine has completed, so break out of the loop + break Loop + } + } else { + <-done + break Loop + } + } + }) } - time.Sleep(100 * time.Millisecond) // wait for goroutine within VStream to finish - assert.Equal(t, int32(2), count.Load()) + } func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { @@ -1266,6 +1353,22 @@ func getSandboxTopo(ctx context.Context, cell string, keyspace string, shards [] return st } +func getSandboxTopoMultiCell(ctx context.Context, cells []string, keyspace string, shards []string) *sandboxTopo { + st := newSandboxForCells(ctx, cells) + ts := st.topoServer + + for _, cell := range cells { + ts.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{}) + } + + ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) + + for _, shard := range shards { + ts.CreateShard(ctx, keyspace, shard) + } + return st +} + func addTabletToSandboxTopo(t *testing.T, ctx context.Context, st *sandboxTopo, ks, shard string, tablet *topodatapb.Tablet) { _, err := st.topoServer.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias