diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index a24637a6edc..eb4c8c13bde 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,7 +17,9 @@ limitations under the License. package discovery import ( + "context" "fmt" + "io" "math/rand" "sort" "strings" @@ -25,20 +27,16 @@ import ( "time" "vitess.io/vitess/go/stats" - + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/log" - - "context" - + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type TabletPickerCellPreference int @@ -280,13 +278,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } -// PickForStreaming picks an available tablet +// PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) - // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found - // or the context is canceled + // Keep trying at intervals (tabletPickerRetryDelay) until a healthy + // serving tablet is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -302,7 +299,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates) allOtherCandidates = tp.orderByTabletType(allOtherCandidates) } else { - // Randomize candidates + // Randomize candidates. rand.Shuffle(len(sameCellCandidates), func(i, j int) { sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] }) @@ -325,9 +322,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table }) } if len(candidates) == 0 { - // if no candidates were found, sleep and try again + // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds", + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -338,58 +335,49 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - for _, ti := range candidates { - // try to connect to tablet - if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil - } - // err found - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - } - // Got here? Means we iterated all tablets and did not find a healthy one - tp.incNoTabletFoundStat() + log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + return candidates[0].Tablet, nil } } -// GetMatchingTablets returns a list of TabletInfo for tablets -// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +// GetMatchingTablets returns a list of TabletInfo for healthy +// serving tablets that match the cells, keyspace, shard and +// tabletTypes for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { - // Special handling for PRIMARY tablet type - // Since there is only one primary, we ignore cell and find the primary + // Special handling for PRIMARY tablet type: since there is only + // one primary per shard, we ignore cell and find the primary. aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY { shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) + log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } aliases = append(aliases, si.PrimaryAlias) } else { actualCells := make([]string, 0) for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast + // Check if cell is actually an alias; using a + // non-blocking read so that this is fast. shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() _, err := tp.ts.GetCellInfo(shortCtx, cell, false) if err != nil { - // not a valid cell, check whether it is a cell alias + // Not a valid cell, check whether it is a cell alias... shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) - // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + // If we get an error, either cellAlias doesn't exist or + // it isn't a cell alias at all; ignore and continue. if err == nil { actualCells = append(actualCells, alias.Cells...) } else { log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { - // valid cell, add it to our list + // Valid cell, add it to our list. actualCells = append(actualCells, cell) } } @@ -397,7 +385,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() - // match cell, keyspace and shard + // Match cell, keyspace, and shard. sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) if err != nil { continue @@ -412,33 +400,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if len(aliases) == 0 { return nil } + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) defer cancel() tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) if err != nil { - log.Warningf("error fetching tablets from topo: %v", err) - // If we get a partial result we can still use it, otherwise return + log.Warningf("Error fetching tablets from topo: %v", err) + // If we get a partial result we can still use it, otherwise return. if len(tabletMap) == 0 { return nil } } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { - // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores - // topo.ErrNoNode). Just log a warning - log.Warningf("failed to load tablet %v", tabletAlias) + // Either tablet disappeared on us, or we got a partial result + // (GetTabletMap ignores topo.ErrNoNode); just log a warning. + log.Warningf("Tablet picker failed to load tablet %v", tabletAlias) } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { - tablets = append(tablets, tabletInfo) + // Try to connect to the tablet and confirm that it's usable. + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { + if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + return io.EOF // End the stream + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") + }); err == nil || err == io.EOF { + tablets = append(tablets, tabletInfo) + } + _ = conn.Close(ctx) + } } } return tablets } func init() { - // TODO(sougou): consolidate this call to be once per process. - rand.Seed(time.Now().UnixNano()) globalTPStats = newTabletPickerStats() } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 88368c02a60..a6d0e8f4c42 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestPickPrimary(t *testing.T) { @@ -479,6 +480,45 @@ func TestPickErrorOnlySpecified(t *testing.T) { require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } +// TestPickFallbackType tests that when providing a list of tablet types to +// pick from, with the list in preference order, that when the primary/first +// type has no available healthy serving tablets that we select a healthy +// serving tablet from the secondary/second type. +func TestPickFallbackType(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{ + TabletOrder: "InOrder", + } + te := newPickerTestEnv(t, cells) + + // This one should be selected even though it's the secondary type + // as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Replica tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -527,18 +567,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) + shr := &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"}, + } if healthy { - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, - }) + shr.RealtimeStats.HealthError = "" } + _ = createFixedHealthConn(tablet, shr) + return tablet } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 8ed2ed5e31f..9e6d0346ec8 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -407,9 +407,9 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target, // SandboxSQRowCount is the default number of fake splits returned. var SandboxSQRowCount = int64(10) -// StreamHealth is not implemented. +// StreamHealth always mocks a "healthy" result. func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { - return fmt.Errorf("not implemented in test") + return nil } // ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos. @@ -632,7 +632,7 @@ func (sbc *SandboxConn) getTxReservedID(txID int64) int64 { return sbc.txIDToRID[txID] } -//StringQueries returns the queries executed as a slice of strings +// StringQueries returns the queries executed as a slice of strings func (sbc *SandboxConn) StringQueries() []string { result := make([]string, len(sbc.Queries)) for i, query := range sbc.Queries { diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index f96fe5ba19d..25269bc9198 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -30,7 +31,6 @@ import ( "context" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" @@ -41,6 +41,13 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + // How many times to retry tablet selection before we + // give up and return an error message that the user + // can see and act upon if needed. + tabletPickerRetries = 5 +) + var ( // deprecated flags (7.0) _ = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology") @@ -70,7 +77,7 @@ type controller struct { done chan struct{} // The following fields are updated after start. So, they need synchronization. - sourceTablet sync2.AtomicString + sourceTablet atomic.Value lastWorkflowError *lastError } @@ -91,6 +98,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor source: &binlogdatapb.BinlogSource{}, lastWorkflowError: newLastError("VReplication Controller", *maxTimeToRetryError), } + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -184,7 +192,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Set("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -214,23 +222,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer dbClient.Close() - var tablet *topodatapb.Tablet - if ct.source.GetExternalMysql() == "" { - log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tablet, err = ct.tabletPicker.PickForStreaming(ctx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return err - } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) - ct.sourceTablet.Set(tablet.Alias.String()) + tablet, err := ct.pickSourceTablet(ctx, dbClient) + if err != nil { + return err } + switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. @@ -281,7 +277,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { ct.lastWorkflowError.record(err) // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) + // we cannot identify this as non-recoverable, but it has persisted + // beyond the retry limit (maxTimeToRetryError). if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { @@ -306,6 +303,35 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) } return nil } + +// pickSourceTablet picks a healthy serving tablet to source for +// the vreplication stream. If the source is marked as external, it +// returns nil. +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) { + if ct.source.GetExternalMysql() != "" { + return nil, nil + } + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) + } + return tablet, err + } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) + return tablet, err +} + func (ct *controller) Stop() { ct.cancel() <-ct.done diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index c8c242bab05..a784028c558 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -27,6 +27,8 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -142,7 +144,10 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get() + ta := ct.sourceTablet.Load() + if ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() + } } return result })) @@ -395,7 +400,7 @@ func (st *vrStats) status() *EngineStatus { Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), State: ct.blpStats.State.Get(), - SourceTablet: ct.sourceTablet.Get(), + SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -427,7 +432,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet string + SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 2accc3cfa24..b63583e57ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,8 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -107,8 +109,14 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") - testStats.controllers[2].sourceTablet.Set("src2") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) + testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 02, + }) close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -135,7 +143,10 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) sleepTime := 1 * time.Millisecond record := func(phase string) { diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 4aeb703799f..1bc29f60880 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -35,6 +35,8 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -47,6 +49,12 @@ import ( _ "vitess.io/vitess/go/vt/vttablet/grpctabletconn" ) +func init() { + // enforce we will use the right protocol (gRPC) in all unit tests + *tmclient.TabletManagerProtocol = "grpc" + *tabletconn.TabletProtocol = "grpc" +} + // This file was copied from testlib. All tests from testlib should be moved // to the current directory. In order to move tests from there, we have to // remove the circular dependency it causes (through vtctl dependence). @@ -80,6 +88,8 @@ type fakeTablet struct { StartHTTPServer bool HTTPListener net.Listener HTTPServer *http.Server + + queryservice.QueryService } // TabletOption is an interface for changing tablet parameters. @@ -140,6 +150,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy Tablet: tablet, FakeMysqlDaemon: fakeMysqlDaemon, RPCServer: grpc.NewServer(), + QueryService: fakes.ErrorQueryService, } } @@ -237,8 +248,14 @@ func (ft *fakeTablet) Target() querypb.Target { } } -func init() { - // enforce we will use the right protocol (gRPC) in all unit tests - *tmclient.TabletManagerProtocol = "grpc" - *tabletconn.TabletProtocol = "grpc" +func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ft.Tablet.Keyspace, + Shard: ft.Tablet.Shard, + TabletType: ft.Tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) } diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 995442bbd17..892bf0a0cf0 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -18,9 +18,12 @@ package wrangler import ( "fmt" + "math/rand" + "sync" "testing" "time" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/mysql/fakesqldb" @@ -39,6 +42,8 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -74,6 +79,7 @@ type testMigraterEnv struct { sourceKeyRanges []*topodatapb.KeyRange targetKeyRanges []*topodatapb.KeyRange tmeDB *fakesqldb.DB + mu sync.Mutex } // testShardMigraterEnv has some convenience functions for adding expected queries. @@ -135,6 +141,19 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + *tabletconn.TabletProtocol = "grpc" + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -296,6 +315,19 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + *tabletconn.TabletProtocol = "grpc" + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschema.Vindex{