From 676a8446b61d644af614377bf2128f6a557b51dd Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Thu, 27 Jul 2023 11:07:05 -0400 Subject: [PATCH 1/4] VReplication: Make Source Tablet Selection More Robust (#13582) --- go/vt/discovery/tablet_picker.go | 101 ++++----- go/vt/discovery/tablet_picker_test.go | 65 +++++- go/vt/vterrors/last_error.go | 88 ++++++++ .../tabletmanager/vreplication/controller.go | 75 ++++--- .../tabletmanager/vreplication/engine.go | 2 +- .../tabletmanager/vreplication/stats.go | 12 +- .../tabletmanager/vreplication/stats_test.go | 17 +- go/vt/wrangler/fake_tablet_test.go | 25 ++- go/vt/wrangler/traffic_switcher_env_test.go | 199 ++++++++++++++++++ go/vt/wrangler/vdiff_env_test.go | 16 ++ go/vt/wrangler/wrangler.go | 3 + 11 files changed, 506 insertions(+), 97 deletions(-) create mode 100644 go/vt/vterrors/last_error.go diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index c9537d3851e..167708c5e2d 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,7 +17,9 @@ limitations under the License. package discovery import ( + "context" "fmt" + "io" "math/rand" "sort" "strings" @@ -25,20 +27,16 @@ import ( "time" "vitess.io/vitess/go/stats" - + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/log" - - "context" - + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type TabletPickerCellPreference int @@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } -// PickForStreaming picks an available tablet. +// PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) - // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found - // or the context is canceled + // Keep trying at intervals (tabletPickerRetryDelay) until a healthy + // serving tablet is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } else if tp.inOrder { candidates = tp.orderByTabletType(candidates) } else { - // Randomize candidates + // Randomize candidates. rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) } if len(candidates) == 0 { - // if no candidates were found, sleep and try again + // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds", + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -349,34 +346,24 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - for _, ti := range candidates { - // try to connect to tablet - if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil - } - // err found - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - } - // Got here? Means we iterated all tablets and did not find a healthy one - tp.incNoTabletFoundStat() + log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + return candidates[0].Tablet, nil } } -// GetMatchingTablets returns a list of TabletInfo for tablets -// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +// GetMatchingTablets returns a list of TabletInfo for healthy +// serving tablets that match the cells, keyspace, shard and +// tabletTypes for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { - // Special handling for PRIMARY tablet type - // Since there is only one primary, we ignore cell and find the primary + // Special handling for PRIMARY tablet type: since there is only + // one primary per shard, we ignore cell and find the primary. aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) + log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { @@ -385,24 +372,25 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } else { actualCells := make([]string, 0) for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast + // Check if cell is actually an alias; using a + // non-blocking read so that this is fast. shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() _, err := tp.ts.GetCellInfo(shortCtx, cell, false) if err != nil { - // not a valid cell, check whether it is a cell alias + // Not a valid cell, check whether it is a cell alias... shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) - // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + // If we get an error, either cellAlias doesn't exist or + // it isn't a cell alias at all; ignore and continue. if err == nil { actualCells = append(actualCells, alias.Cells...) } else { log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { - // valid cell, add it to our list + // Valid cell, add it to our list. actualCells = append(actualCells, cell) } } @@ -410,12 +398,11 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - // match cell, keyspace and shard + // Match cell, keyspace, and shard. sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) if err != nil { continue } - for _, node := range sri.Nodes { if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { aliases = append(aliases, node.TabletAlias) @@ -427,33 +414,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if len(aliases) == 0 { return nil } + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) if err != nil { - log.Warningf("error fetching tablets from topo: %v", err) - // If we get a partial result we can still use it, otherwise return + log.Warningf("Error fetching tablets from topo: %v", err) + // If we get a partial result we can still use it, otherwise return. if len(tabletMap) == 0 { return nil } } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { - // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores - // topo.ErrNoNode). Just log a warning - log.Warningf("failed to load tablet %v", tabletAlias) + // Either tablet disappeared on us, or we got a partial result + // (GetTabletMap ignores topo.ErrNoNode); just log a warning. + log.Warningf("Tablet picker failed to load tablet %v", tabletAlias) } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { - tablets = append(tablets, tabletInfo) + // Try to connect to the tablet and confirm that it's usable. + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { + if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + return io.EOF // End the stream + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") + }); err == nil || err == io.EOF { + tablets = append(tablets, tabletInfo) + } + _ = conn.Close(ctx) + } } } return tablets } func init() { - // TODO(sougou): consolidate this call to be once per process. - rand.Seed(time.Now().UnixNano()) globalTPStats = newTabletPickerStats() } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index fd2c1635359..91b936303df 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestPickPrimary(t *testing.T) { @@ -503,6 +504,45 @@ func TestPickErrorOnlySpecified(t *testing.T) { require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } +// TestPickFallbackType tests that when providing a list of tablet types to +// pick from, with the list in preference order, that when the primary/first +// type has no available healthy serving tablets that we select a healthy +// serving tablet from the secondary/second type. +func TestPickFallbackType(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{ + TabletOrder: "InOrder", + } + te := newPickerTestEnv(t, cells) + + // This one should be selected even though it's the secondary type + // as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Replica tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -551,18 +591,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) + shr := &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"}, + } if healthy { - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, - }) + shr.RealtimeStats.HealthError = "" } + _ = createFixedHealthConn(tablet, shr) + return tablet } diff --git a/go/vt/vterrors/last_error.go b/go/vt/vterrors/last_error.go new file mode 100644 index 00000000000..1f051825041 --- /dev/null +++ b/go/vt/vterrors/last_error.go @@ -0,0 +1,88 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vterrors + +import ( + "sync" + "time" + + "vitess.io/vitess/go/vt/log" +) + +/* + * LastError tracks the most recent error for any ongoing process and how long it has persisted. + * The err field should be a vterror to ensure we have meaningful error codes, causes, stack + * traces, etc. + */ +type LastError struct { + name string + err error + firstSeen time.Time + lastSeen time.Time + mu sync.Mutex + maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false +} + +func NewLastError(name string, maxTimeInError time.Duration) *LastError { + return &LastError{ + name: name, + maxTimeInError: maxTimeInError, + } +} + +func (le *LastError) Record(err error) { + le.mu.Lock() + defer le.mu.Unlock() + if err == nil { + le.err = nil + le.firstSeen = time.Time{} + le.lastSeen = time.Time{} + return + } + if !Equals(err, le.err) { + le.firstSeen = time.Now() + le.lastSeen = time.Now() + le.err = err + } else { + // same error seen + if time.Since(le.lastSeen) > le.maxTimeInError { + // reset firstSeen, since it has been long enough since the last time we saw this error + log.Infof("Resetting firstSeen for %s, since it is too long since the last one", le.name) + le.firstSeen = time.Now() + } + le.lastSeen = time.Now() + } +} + +func (le *LastError) ShouldRetry() bool { + le.mu.Lock() + defer le.mu.Unlock() + if le.maxTimeInError == 0 { + // The value of 0 means "no time limit" + return true + } + if le.firstSeen.IsZero() { + return true + } + if time.Since(le.firstSeen) <= le.maxTimeInError { + // within the max time range + return true + } + log.Errorf("%s: the same error was encountered continuously since %s, it is now assumed to be unrecoverable; any affected operations will need to be manually restarted once error '%s' has been addressed", + le.name, le.firstSeen.UTC(), le.err) + return false +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 0eed7de71ee..a28804a74c2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -28,8 +28,8 @@ import ( "vitess.io/vitess/go/vt/vterrors" "context" + "sync/atomic" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" @@ -40,6 +40,13 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +const ( + // How many times to retry tablet selection before we + // give up and return an error message that the user + // can see and act upon if needed. + tabletPickerRetries = 5 +) + // controller is created by Engine. Members are initialized upfront. // There is no mutex within a controller becaust its members are // either read-only or self-synchronized. @@ -59,9 +66,9 @@ type controller struct { done chan struct{} // The following fields are updated after start. So, they need synchronization. - sourceTablet sync2.AtomicString + sourceTablet atomic.Value - lastWorkflowError *lastError + lastWorkflowError *vterrors.LastError } // newController creates a new controller. Unless a stream is explicitly 'Stopped', @@ -79,6 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor done: make(chan struct{}), source: &binlogdatapb.BinlogSource{}, } + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -88,7 +96,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } ct.id = uint32(id) ct.workflow = params["workflow"] - ct.lastWorkflowError = newLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError) + ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError) state := params["state"] blpStats.State.Set(state) @@ -173,7 +181,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Set("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -203,23 +211,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer dbClient.Close() - var tablet *topodatapb.Tablet - if ct.source.GetExternalMysql() == "" { - log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tablet, err = ct.tabletPicker.PickForStreaming(ctx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return err - } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) - ct.sourceTablet.Set(tablet.Alias.String()) + tablet, err := ct.pickSourceTablet(ctx, dbClient) + if err != nil { + return err } + switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. @@ -267,10 +263,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) err = vr.Replicate(ctx) - ct.lastWorkflowError.record(err) + ct.lastWorkflowError.Record(err) + // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) - if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() { + // we cannot identify this as non-recoverable, but it has persisted + // beyond the retry limit (maxTimeToRetryError). + if isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { return err // yes, err and not errSetState. @@ -294,6 +292,35 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) } return nil } + +// pickSourceTablet picks a healthy serving tablet to source for +// the vreplication stream. If the source is marked as external, it +// returns nil. +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) { + if ct.source.GetExternalMysql() != "" { + return nil, nil + } + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) + } + return tablet, err + } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) + return tablet, err +} + func (ct *controller) Stop() { ct.cancel() <-ct.done diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index afce87aa630..ef0d1376857 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -739,7 +739,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { return fmt.Errorf("unexpected result: %v", qr) } - // When err is not nil then we got a retryable error and will loop again + // When err is not nil then we got a retryable error and will loop again. if err == nil { current, dcerr := binlogplayer.DecodePosition(qr.Rows[0][0].ToString()) if dcerr != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index c8c242bab05..727d32b9d8d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -27,6 +27,8 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -142,7 +144,10 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get() + ta := ct.sourceTablet.Load() + if ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() + } } return result })) @@ -394,8 +399,7 @@ func (st *vrStats) status() *EngineStatus { ReplicationLagSeconds: ct.blpStats.ReplicationLagSeconds.Get(), Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), - State: ct.blpStats.State.Get(), - SourceTablet: ct.sourceTablet.Get(), + SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -427,7 +431,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet string + SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 2accc3cfa24..b63583e57ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,8 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -107,8 +109,14 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") - testStats.controllers[2].sourceTablet.Set("src2") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) + testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 02, + }) close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -135,7 +143,10 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) sleepTime := 1 * time.Millisecond record := func(phase string) { diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 9fdb6e616a1..254e1813d8d 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -32,6 +32,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -48,6 +50,12 @@ import ( _ "vitess.io/vitess/go/vt/vttablet/grpctabletconn" ) +func init() { + // Ensure we will use the right protocol (gRPC) in all unit tests. + tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") + tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +} + // This file was copied from testlib. All tests from testlib should be moved // to the current directory. In order to move tests from there, we have to // remove the circular dependency it causes (through vtctl dependence). @@ -81,6 +89,8 @@ type fakeTablet struct { StartHTTPServer bool HTTPListener net.Listener HTTPServer *http.Server + + queryservice.QueryService } // TabletOption is an interface for changing tablet parameters. @@ -141,6 +151,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy Tablet: tablet, FakeMysqlDaemon: fakeMysqlDaemon, RPCServer: grpc.NewServer(), + QueryService: fakes.ErrorQueryService, } } @@ -238,8 +249,14 @@ func (ft *fakeTablet) Target() querypb.Target { } } -func init() { - // enforce we will use the right protocol (gRPC) in all unit tests - tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") - tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ft.Tablet.Keyspace, + Shard: ft.Tablet.Shard, + TabletType: ft.Tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) } diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 02eb5afb377..1ee8e26e4e9 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -18,9 +18,14 @@ package wrangler import ( "fmt" + "math/rand" + "sync" "testing" "time" + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/mysql/fakesqldb" @@ -30,6 +35,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -39,6 +45,9 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -74,6 +83,7 @@ type testMigraterEnv struct { sourceKeyRanges []*topodatapb.KeyRange targetKeyRanges []*topodatapb.KeyRange tmeDB *fakesqldb.DB + mu sync.Mutex } // testShardMigraterEnv has some convenience functions for adding expected queries. @@ -135,6 +145,19 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -260,6 +283,169 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, return tme } +// newTestTablePartialMigrater creates a test tablet migrater +// specifially for partial or shard by shard migrations. +// The shards must be the same on the source and target, and we +// must be moving a subset of them. +// fmtQuery should be of the form: 'select a, b %s group by a'. +// The test will Sprintf a from clause and where clause as needed. +func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv { + require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces") + tme := &testMigraterEnv{} + tme.ts = memorytopo.NewServer("cell1", "cell2") + tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient()) + tme.wr.sem = semaphore.NewWeighted(1) + tme.sourceShards = shards + tme.targetShards = shards + tme.tmeDB = fakesqldb.New(t) + expectVDiffQueries(tme.tmeDB) + tabletID := 10 + for _, shard := range tme.sourceShards { + tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard))) + tabletID += 10 + + _, sourceKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) + } + tpChoiceTablet := tme.sourcePrimaries[0].Tablet + tpChoice = &testTabletPickerChoice{ + keyspace: tpChoiceTablet.Keyspace, + shard: tpChoiceTablet.Shard, + } + for _, shard := range tme.targetShards { + tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard))) + tabletID += 10 + + _, targetKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) + } + + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + err := tme.ts.SaveVSchema(ctx, "ks1", vs) + require.NoError(t, err) + err = tme.ts.SaveVSchema(ctx, "ks2", vs) + require.NoError(t, err) + err = tme.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false) + require.NoError(t, err) + + tme.startTablets(t) + tme.createDBClients(ctx, t) + tme.setPrimaryPositions() + now := time.Now().Unix() + + for i, shard := range shards { + for _, shardToMove := range shardsToMove { + var streamInfoRows []string + var streamExtInfoRows []string + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) + } + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...)) + tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + } + } + + for i, shard := range shards { + for _, shardToMove := range shardsToMove { + var streamInfoRows []string + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks2", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + } + tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...), + ) + } + } + + tme.targetKeyspace = "ks2" + return tme +} + func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv { tme := &testShardMigraterEnv{} tme.ts = memorytopo.NewServer("cell1", "cell2") @@ -296,6 +482,19 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschema.Vindex{ diff --git a/go/vt/wrangler/vdiff_env_test.go b/go/vt/wrangler/vdiff_env_test.go index 1d50f4dc28a..ca456867340 100644 --- a/go/vt/wrangler/vdiff_env_test.go +++ b/go/vt/wrangler/vdiff_env_test.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/logutil" @@ -342,3 +343,18 @@ func (tmc *testVDiffTMClient) PrimaryPosition(ctx context.Context, tablet *topod } return pos, nil } + +func expectVDiffQueries(db *fakesqldb.DB) { + res := &sqltypes.Result{} + queries := []string{ + "USE `vt_ks`", + "USE `vt_ks1`", + "USE `vt_ks2`", + "optimize table _vt.copy_state", + "alter table _vt.copy_state auto_increment = 1", + } + for _, query := range queries { + db.AddQuery(query, res) + } + db.AddQueryPattern("delete from vd, vdt, vdl.*", res) +} diff --git a/go/vt/wrangler/wrangler.go b/go/vt/wrangler/wrangler.go index ea04baf6569..1bf3763f7f6 100644 --- a/go/vt/wrangler/wrangler.go +++ b/go/vt/wrangler/wrangler.go @@ -21,6 +21,7 @@ package wrangler import ( "context" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" @@ -53,6 +54,8 @@ type Wrangler struct { // VExecFunc is a test-only fixture that allows us to short circuit vexec commands. // DO NOT USE in production code. VExecFunc func(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) + // Limt the number of concurrent background goroutines if needed. + sem *semaphore.Weighted } // New creates a new Wrangler object. From 100653d8327e0b6f59a2419fa24ef6b3fd7e88b1 Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Tue, 27 Aug 2024 15:49:50 -0500 Subject: [PATCH 2/4] update sprintf args. --- go/vt/wrangler/traffic_switcher_env_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 1ee8e26e4e9..963be5572ab 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -398,7 +398,7 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) } - tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1), noResult) tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|message|cell|tablet_types", "int64|varchar|varchar|varchar|varchar"), @@ -432,7 +432,7 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar }, } streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) - tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1), noResult) } tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|message|cell|tablet_types", From 8f78ade879f3e9afec7f295365e3925687bf1200 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 21 Aug 2024 20:10:23 +0200 Subject: [PATCH 3/4] `slack-15.0`: pre-backport `txthrottler` crash fixes (#480) * `txthrottler`: move `ThrottlerInterface` to `go/vt/throttler`, use `slices` pkg, add stats (#16248) Signed-off-by: Tim Vaillancourt * revert to `reflect` Signed-off-by: Tim Vaillancourt * Support passing filters to `discovery.NewHealthCheck(...)` Signed-off-by: Tim Vaillancourt * Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go Co-authored-by: Matt Lord Signed-off-by: Tim Vaillancourt * Address some PR suggestions Signed-off-by: Tim Vaillancourt * PR ctx suggestion Signed-off-by: Tim Vaillancourt * fix test Signed-off-by: Tim Vaillancourt * simplify updateHealthCheckCells signature Signed-off-by: Tim Vaillancourt * Fix race in `replicationLagModule` of `go/vt/throttle` Signed-off-by: Tim Vaillancourt --------- Signed-off-by: Tim Vaillancourt Co-authored-by: Matt Lord --- go/vt/discovery/healthcheck.go | 52 +++++---- go/vt/discovery/healthcheck_test.go | 79 +++++++++++++- go/vt/discovery/keyspace_events_test.go | 4 +- go/vt/throttler/demo/throttler_demo.go | 6 +- go/vt/throttler/manager.go | 10 +- go/vt/throttler/manager_test.go | 2 +- go/vt/throttler/max_replication_lag_module.go | 20 ++-- go/vt/throttler/replication_lag_cache.go | 40 +++++++ go/vt/throttler/replication_lag_cache_test.go | 9 ++ go/vt/throttler/result.go | 24 ++-- go/vt/throttler/result_test.go | 20 ++-- go/vt/throttler/throttler.go | 74 +++++++------ go/vt/throttler/throttler_test.go | 95 +++++++++++++++- .../throttlerclient_testsuite.go | 2 +- go/vt/throttler/throttlerlogz.go | 2 +- go/vt/throttler/throttlerlogz_test.go | 8 +- go/vt/vtctld/vtctld.go | 2 +- go/vt/vtgate/tabletgateway.go | 6 +- .../txthrottler/mock_throttler_test.go | 103 ++++++++++-------- .../tabletserver/txthrottler/tx_throttler.go | 91 +++++++--------- .../txthrottler/tx_throttler_test.go | 15 ++- 21 files changed, 453 insertions(+), 211 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 6374ecd5078..612323ca9b7 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -35,6 +35,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "hash/crc32" "html/template" @@ -98,6 +99,9 @@ var ( // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond + + // errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined. + errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -296,6 +300,27 @@ type HealthCheckImpl struct { healthCheckDialSem *semaphore.Weighted } +// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. +func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + return nil, errKeyspacesToWatchAndTabletFilters + } + + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err) + } + filters = append(filters, fbs) + } else if len(KeyspacesToWatch) > 0 { + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) + } + return filters, nil +} + // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. @@ -317,10 +342,14 @@ type HealthCheckImpl struct { // // The localCell for this healthcheck // -// callback. +// cellsToWatch. // -// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { +// Is a list of cells to watch for tablets. +// +// filters. +// +// Is one or more filters to apply when determining what tablets we want to stream healthchecks from. +func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) hc := &HealthCheckImpl{ @@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } for _, c := range cells { - var filters TabletFilters log.Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue } - if len(tabletFilters) > 0 { - if len(KeyspacesToWatch) > 0 { - log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") - } - - fbs, err := NewFilterByShard(tabletFilters) - if err != nil { - log.Exitf("Cannot parse tablet_filters parameter: %v", err) - } - filters = append(filters, fbs) - } else if len(KeyspacesToWatch) > 0 { - filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) - } - if len(tabletFilterTags) > 0 { - filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) - } topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 35cd1f17d05..28a0dcf91fe 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -64,6 +64,77 @@ func init() { refreshInterval = time.Minute } +func TestNewVTGateHealthCheckFilters(t *testing.T) { + defer func() { + KeyspacesToWatch = nil + tabletFilters = nil + tabletFilterTags = nil + }() + + testCases := []struct { + name string + keyspacesToWatch []string + tabletFilters []string + tabletFilterTags map[string]string + expectedError string + expectedFilterTypes []any + }{ + { + name: "noFilters", + }, + { + name: "tabletFilters", + tabletFilters: []string{"ks1|-80"}, + expectedFilterTypes: []any{&FilterByShard{}}, + }, + { + name: "keyspacesToWatch", + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}}, + }, + { + name: "tabletFiltersAndTags", + tabletFilters: []string{"ks1|-80"}, + tabletFilterTags: map[string]string{"test": "true"}, + expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}}, + }, + { + name: "keyspacesToWatchAndTags", + tabletFilterTags: map[string]string{"test": "true"}, + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}}, + }, + { + name: "failKeyspacesToWatchAndFilters", + tabletFilters: []string{"ks1|-80"}, + keyspacesToWatch: []string{"ks1"}, + expectedError: errKeyspacesToWatchAndTabletFilters.Error(), + }, + { + name: "failInvalidTabletFilters", + tabletFilters: []string{"shouldfail!@#!"}, + expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + KeyspacesToWatch = testCase.keyspacesToWatch + tabletFilters = testCase.tabletFilters + tabletFilterTags = testCase.tabletFilterTags + + filters, err := NewVTGateHealthCheckFilters() + if testCase.expectedError != "" { + assert.EqualError(t, err, testCase.expectedError) + } + assert.Len(t, filters, len(testCase.expectedFilterTypes)) + for i, filter := range filters { + assert.IsType(t, testCase.expectedFilterTypes[i], filter) + } + }) + } +} + func TestHealthCheck(t *testing.T) { // reset error counters hcErrorCounters.ResetAll() @@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) { func TestPrimaryInOtherCell(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as primary in different cell @@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) { func TestReplicaInOtherCell(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as replica @@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) { func TestCellAliases(t *testing.T) { ts := memorytopo.NewServer("cell1", "cell2") - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() cellsAlias := &topodatapb.CellsAlias{ @@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic } func createTestHc(ts *topo.Server) *HealthCheckImpl { - return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "") + return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil) } type fakeConn struct { diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index 652e4ff7c7b..bd1b6def62f 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell) kss := &keyspaceState{ @@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell) diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 3593bc0806d..91d7e45f92c 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -101,7 +101,7 @@ type replica struct { // throttler is used to enforce the maximum rate at which replica applies // transactions. It must not be confused with the client's throttler. - throttler *throttler.Throttler + throttler throttler.Throttler lastHealthUpdate time.Time lagUpdateInterval time.Duration @@ -224,7 +224,7 @@ type client struct { primary *primary healthCheck discovery.HealthCheck - throttler *throttler.Throttler + throttler throttler.Throttler stopChan chan struct{} wg sync.WaitGroup @@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client { log.Fatal(err) } - healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "") + healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil) c := &client{ primary: primary, healthCheck: healthCheck, diff --git a/go/vt/throttler/manager.go b/go/vt/throttler/manager.go index c2ee9f0a652..ee142190f75 100644 --- a/go/vt/throttler/manager.go +++ b/go/vt/throttler/manager.go @@ -64,16 +64,16 @@ type managerImpl struct { // mu guards all fields in this group. mu sync.Mutex // throttlers tracks all running throttlers (by their name). - throttlers map[string]*Throttler + throttlers map[string]Throttler } func newManager() *managerImpl { return &managerImpl{ - throttlers: make(map[string]*Throttler), + throttlers: make(map[string]Throttler), } } -func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error { +func (m *managerImpl) registerThrottler(name string, throttler Throttler) error { m.mu.Lock() defer m.mu.Unlock() @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string { // log returns the most recent changes of the MaxReplicationLag module. // There will be one result for each processed replication lag record. -func (m *managerImpl) log(throttlerName string) ([]result, error) { +func (m *managerImpl) log(throttlerName string) ([]Result, error) { m.mu.Lock() defer m.mu.Unlock() @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) { return nil, fmt.Errorf("throttler: %v does not exist", throttlerName) } - return t.log(), nil + return t.Log(), nil } diff --git a/go/vt/throttler/manager_test.go b/go/vt/throttler/manager_test.go index 8c0e6ae4563..a483ce9dc8f 100644 --- a/go/vt/throttler/manager_test.go +++ b/go/vt/throttler/manager_test.go @@ -37,7 +37,7 @@ var ( type managerTestFixture struct { m *managerImpl - t1, t2 *Throttler + t1, t2 Throttler } func (f *managerTestFixture) setUp() error { diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index e1a76f89c57..f94f6fabf4a 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec m.memory.ageBadRate(now) - r := result{ + r := Result{ Now: now, RateChange: unchangedRate, lastRateChange: m.lastRateChange, @@ -445,7 +445,7 @@ func stateGreater(a, b state) bool { // and we should not skip the current replica ("lagRecordNow"). // Even if it's the same replica we may skip it and return false because // we want to wait longer for the propagation of the current rate change. -func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { +func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { if m.replicaUnderTest == nil { return true } @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t return true } -func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown) oldRate := m.rate.Get() @@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa return minDuration } -func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { // Guess replication rate based on the difference in the replication lag of this // particular replica. lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange) @@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, // guessReplicationRate guesses the actual replication rate based on the new bac // Note that "lagDifference" can be positive (lag increased) or negative (lag // decreased). -func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { +func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { // avgReplicationRate is the average rate (per second) at which the replica // applied transactions from the replication stream. We infer the value // from the relative change in the replication lag. @@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate return int64(newRate), reason } -func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown) decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec) m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason) } -func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { +func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { oldRate := m.rate.Get() rate := int64(float64(oldRate) - float64(oldRate)*decrease) if rate == 0 { @@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases()) } -func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { +func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { oldRate := m.rate.Get() m.currentState = newState @@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int // markCurrentRateAsBadOrGood determines the actual rate between the last rate // change and "now" and determines if that rate was bad or good. -func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) { +func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) { if m.lastRateChange.IsZero() { // Module was just started. We don't have any data points yet. r.GoodOrBad = ignoredRate @@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time } } -func (m *MaxReplicationLagModule) log() []result { +func (m *MaxReplicationLagModule) log() []Result { return m.results.latestValues() } diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c9c2e94f113..ab26c0bc6b8 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -18,6 +18,7 @@ package throttler import ( "sort" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -30,6 +31,8 @@ type replicationLagCache struct { // The map key is replicationLagRecord.LegacyTabletStats.Key. entries map[string]*replicationLagHistory + mu sync.Mutex + // slowReplicas is a set of slow replicas. // The map key is replicationLagRecord.LegacyTabletStats.Key. // This map will always be recomputed by sortByLag() and must not be modified @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { + c.mu.Lock() + defer c.mu.Unlock() + if !r.Serving { // Tablet is down. Do no longer track it. delete(c.entries, discovery.TabletToMapKey(r.Tablet)) @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry, ok := c.entries[key] + if !ok { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { // or just after it. // If there is no such record, a zero record is returned. func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag // sortByLag sorts all replicas by their latest replication lag value and // tablet uid and updates the c.slowReplicas set. func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { + c.mu.Lock() + defer c.mu.Unlock() + // Reset the current list of ignored replicas. c.slowReplicas = make(map[string]bool) @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/replication_lag_cache_test.go b/go/vt/throttler/replication_lag_cache_test.go index 312f97e1999..9b34210d096 100644 --- a/go/vt/throttler/replication_lag_cache_test.go +++ b/go/vt/throttler/replication_lag_cache_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/discovery" ) @@ -91,3 +93,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) { t.Fatal("r1 should be tracked as a slow replica") } } + +func TestReplicationLagCache_MaxLag(t *testing.T) { + c := newReplicationLagCache(2) + c.add(lagRecord(sinceZero(1*time.Second), r1, 30)) + c.add(lagRecord(sinceZero(1*time.Second), r2, 1)) + require.Equal(t, uint32(30), c.maxLag()) +} diff --git a/go/vt/throttler/result.go b/go/vt/throttler/result.go index 179711116a3..8af02e58a3b 100644 --- a/go/vt/throttler/result.go +++ b/go/vt/throttler/result.go @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}} lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}} reason: {{.Reason}}`)) -// result is generated by the MaxReplicationLag module for each processed +// Result is generated by the MaxReplicationLag module for each processed // "replicationLagRecord". // It captures the details and the decision of the processing. -type result struct { +type Result struct { Now time.Time RateChange rateChange lastRateChange time.Time @@ -80,7 +80,7 @@ type result struct { GuessedReplicationBacklogNew int } -func (r result) String() string { +func (r Result) String() string { var b bytes.Buffer if err := resultStringTemplate.Execute(&b, r); err != nil { panic(fmt.Sprintf("failed to Execute() template: %v", err)) @@ -88,25 +88,25 @@ func (r result) String() string { return b.String() } -func (r result) Alias() string { +func (r Result) Alias() string { return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias) } -func (r result) TimeSinceLastRateChange() string { +func (r Result) TimeSinceLastRateChange() string { if r.lastRateChange.IsZero() { return "n/a" } return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds()) } -func (r result) LagBefore() string { +func (r Result) LagBefore() string { if r.LagRecordBefore.isZero() { return "n/a" } return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds) } -func (r result) AgeOfBeforeLag() string { +func (r Result) AgeOfBeforeLag() string { if r.LagRecordBefore.isZero() { return "n/a" } @@ -123,18 +123,18 @@ type resultRing struct { // started reusing entries. wrapped bool // values is the underlying ring buffer. - values []result + values []Result } // newResultRing creates a new resultRing. func newResultRing(capacity int) *resultRing { return &resultRing{ - values: make([]result, capacity), + values: make([]Result, capacity), } } // add inserts a new result into the ring buffer. -func (rr *resultRing) add(r result) { +func (rr *resultRing) add(r Result) { rr.mu.Lock() defer rr.mu.Unlock() @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) { // latestValues returns all values of the buffer. Entries are sorted in reverse // chronological order i.e. newer items come first. -func (rr *resultRing) latestValues() []result { +func (rr *resultRing) latestValues() []Result { rr.mu.Lock() defer rr.mu.Unlock() @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result { count = rr.position } - results := make([]result, count) + results := make([]Result, count) for i := 0; i < count; i++ { pos := start - i if pos < 0 { diff --git a/go/vt/throttler/result_test.go b/go/vt/throttler/result_test.go index 9efc7df9412..9eadab503e8 100644 --- a/go/vt/throttler/result_test.go +++ b/go/vt/throttler/result_test.go @@ -23,7 +23,7 @@ import ( ) var ( - resultIncreased = result{ + resultIncreased = Result{ Now: sinceZero(1234 * time.Millisecond), RateChange: increasedRate, lastRateChange: sinceZero(1 * time.Millisecond), @@ -45,7 +45,7 @@ var ( GuessedReplicationBacklogOld: 0, GuessedReplicationBacklogNew: 0, } - resultDecreased = result{ + resultDecreased = Result{ Now: sinceZero(5000 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(1234 * time.Millisecond), @@ -67,7 +67,7 @@ var ( GuessedReplicationBacklogOld: 10, GuessedReplicationBacklogNew: 20, } - resultEmergency = result{ + resultEmergency = Result{ Now: sinceZero(10123 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(5000 * time.Millisecond), @@ -93,7 +93,7 @@ var ( func TestResultString(t *testing.T) { testcases := []struct { - r result + r Result want string }{ { @@ -135,27 +135,27 @@ reason: emergency state decreased the rate`, func TestResultRing(t *testing.T) { // Test data. - r1 := result{Reason: "r1"} - r2 := result{Reason: "r2"} - r3 := result{Reason: "r3"} + r1 := Result{Reason: "r1"} + r2 := Result{Reason: "r2"} + r3 := Result{Reason: "r3"} rr := newResultRing(2) // Use the ring partially. rr.add(r1) - if got, want := rr.latestValues(), []result{r1}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r1}; !reflect.DeepEqual(got, want) { t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want) } // Use it fully. rr.add(r2) - if got, want := rr.latestValues(), []result{r2, r1}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r2, r1}; !reflect.DeepEqual(got, want) { t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want) } // Let it wrap. rr.add(r3) - if got, want := rr.latestValues(), []result{r3, r2}; !reflect.DeepEqual(got, want) { + if got, want := rr.latestValues(), []Result{r3, r2}; !reflect.DeepEqual(got, want) { t.Fatalf("resultRing did not wrap correctly. got = %v, want = %v", got, want) } } diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 68905db1ad5..cd237548b3b 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -66,7 +67,22 @@ const ( InvalidMaxReplicationLag = -1 ) -// Throttler provides a client-side, thread-aware throttler. +// Throttler defines the throttler interface. +type Throttler interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 + Log() []Result +} + +// ThrottlerImpl implements a client-side, thread-aware throttler. // See the package doc for more information. // // Calls of Throttle() and ThreadFinished() take threadID as parameter which is @@ -74,7 +90,7 @@ const ( // NOTE: Trottle() and ThreadFinished() assume that *per thread* calls to them // // are serialized and must not happen concurrently. -type Throttler struct { +type ThrottlerImpl struct { // name describes the Throttler instance and is used e.g. in the webinterface. name string // unit describes the entity the throttler is limiting e.g. "queries" or @@ -127,15 +143,15 @@ type Throttler struct { // unit refers to the type of entity you want to throttle e.g. "queries" or // "transactions". // name describes the Throttler instance and will be used by the webinterface. -func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (*Throttler, error) { +func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now) } -func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc) } -func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { config := NewMaxReplicationLagModuleConfig(maxReplicationLag) config.MaxReplicationLagSec = maxReplicationLag @@ -143,7 +159,7 @@ func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxR } -func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { err := maxReplicationLagModuleConfig.Verify() if err != nil { return nil, fmt.Errorf("invalid max replication lag config: %w", err) @@ -176,7 +192,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount threadThrottlers[i] = newThreadThrottler(i, actualRateHistory) runningThreads[i] = true } - t := &Throttler{ + t := &ThrottlerImpl{ name: name, unit: unit, manager: manager, @@ -215,7 +231,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount // the backoff duration elapsed. // The maximum value for the returned backoff is 1 second since the throttler // internally operates on a per-second basis. -func (t *Throttler) Throttle(threadID int) time.Duration { +func (t *ThrottlerImpl) Throttle(threadID int) time.Duration { if t.closed { panic(fmt.Sprintf("BUG: thread with ID: %v must not access closed Throttler", threadID)) } @@ -227,30 +243,18 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // MaxLag returns the max of all the last replication lag values seen across all tablets of // the provided type, excluding ignored tablets. -func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { +func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. -func (t *Throttler) ThreadFinished(threadID int) { +func (t *ThrottlerImpl) ThreadFinished(threadID int) { if t.threadFinished[threadID] { panic(fmt.Sprintf("BUG: thread with ID: %v already finished", threadID)) } @@ -265,7 +269,7 @@ func (t *Throttler) ThreadFinished(threadID int) { // Close stops all modules and frees all resources. // When Close() returned, the Throttler object must not be used anymore. -func (t *Throttler) Close() { +func (t *ThrottlerImpl) Close() { for _, m := range t.modules { m.Stop() } @@ -278,7 +282,7 @@ func (t *Throttler) Close() { // threadThrottlers accordingly. // The rate changes when the number of thread changes or a module updated its // max rate. -func (t *Throttler) updateMaxRate() { +func (t *ThrottlerImpl) updateMaxRate() { // Set it to infinite initially. maxRate := int64(math.MaxInt64) @@ -319,39 +323,39 @@ func (t *Throttler) updateMaxRate() { } // MaxRate returns the current rate of the MaxRateModule. -func (t *Throttler) MaxRate() int64 { +func (t *ThrottlerImpl) MaxRate() int64 { return t.maxRateModule.MaxRate() } // SetMaxRate updates the rate of the MaxRateModule. -func (t *Throttler) SetMaxRate(rate int64) { +func (t *ThrottlerImpl) SetMaxRate(rate int64) { t.maxRateModule.SetMaxRate(rate) } // RecordReplicationLag must be called by users to report the "ts" tablet health // data observed at "time". // Note: After Close() is called, this method must not be called anymore. -func (t *Throttler) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { +func (t *ThrottlerImpl) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { t.maxReplicationLagModule.RecordReplicationLag(time, th) } // GetConfiguration returns the configuration of the MaxReplicationLag module. -func (t *Throttler) GetConfiguration() *throttlerdatapb.Configuration { +func (t *ThrottlerImpl) GetConfiguration() *throttlerdatapb.Configuration { return t.maxReplicationLagModule.getConfiguration() } // UpdateConfiguration updates the configuration of the MaxReplicationLag module. -func (t *Throttler) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { +func (t *ThrottlerImpl) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { return t.maxReplicationLagModule.updateConfiguration(configuration, copyZeroValues) } // ResetConfiguration resets the configuration of the MaxReplicationLag module // to its initial settings. -func (t *Throttler) ResetConfiguration() { +func (t *ThrottlerImpl) ResetConfiguration() { t.maxReplicationLagModule.resetConfiguration() } -// log returns the most recent changes of the MaxReplicationLag module. -func (t *Throttler) log() []result { +// Log returns the most recent changes of the MaxReplicationLag module. +func (t *ThrottlerImpl) Log() []Result { return t.maxReplicationLagModule.log() } diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index 0bb0ed0387a..e7e7c13c466 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -17,10 +17,18 @@ limitations under the License. package throttler import ( + "context" "runtime" "strings" + "sync" "testing" "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" ) // The main purpose of the benchmarks below is to demonstrate the functionality @@ -162,7 +170,7 @@ func sinceZero(sinceZero time.Duration) time.Time { // threadThrottler.newThreadThrottler() for more details. // newThrottlerWithClock should only be used for testing. -func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, nowFunc) } @@ -274,14 +282,16 @@ func TestThreadFinished(t *testing.T) { // Max rate update to threadThrottlers happens asynchronously. Wait for it. timer := time.NewTimer(2 * time.Second) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) for { - if throttler.threadThrottlers[0].getMaxRate() == 2 { + if throttlerImpl.threadThrottlers[0].getMaxRate() == 2 { timer.Stop() break } select { case <-timer.C: - t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttler.threadThrottlers[0].getMaxRate()) + t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttlerImpl.threadThrottlers[0].getMaxRate()) default: // Timer not up yet. Try again. } @@ -389,7 +399,9 @@ func TestUpdateMaxRate_AllThreadsFinished(t *testing.T) { throttler.ThreadFinished(1) // Make sure that there's no division by zero error (threadsRunning == 0). - throttler.updateMaxRate() + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + throttlerImpl.updateMaxRate() // We don't care about the Throttler state at this point. } @@ -426,3 +438,78 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { }() throttler.ThreadFinished(0) } + +func TestThrottlerMaxLag(t *testing.T) { + fc := &fakeClock{} + throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + require.NoError(t, err) + defer throttler.Close() + + require.NotNil(t, throttler) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + require.NotNil(t, throttlerImpl.maxReplicationLagModule) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // run .add() and .MaxLag() concurrently to detect races + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, t *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + throttler.MaxLag(tabletType) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, throttler *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) + require.NotNil(t, cache) + cache.add(replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{ + Serving: true, + Stats: &query.RealtimeStats{ + ReplicationLagSeconds: 5, + }, + Tablet: &topodata.Tablet{ + Hostname: t.Name(), + Type: tabletType, + PortMap: map[string]int32{ + "test": 15999, + }, + }, + }, + }) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + } + time.Sleep(time.Second) + cancel() + wg.Wait() + + // check .MaxLag() + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) + } +} diff --git a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go index 38fd9d76286..99b5a40e0ca 100644 --- a/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go +++ b/go/vt/throttler/throttlerclienttest/throttlerclient_testsuite.go @@ -73,7 +73,7 @@ func TestSuitePanics(t *testing.T, c throttlerclient.Client) { var throttlerNames = []string{"t1", "t2"} type testFixture struct { - throttlers []*throttler.Throttler + throttlers []throttler.Throttler } func (tf *testFixture) setUp() error { diff --git a/go/vt/throttler/throttlerlogz.go b/go/vt/throttler/throttlerlogz.go index 6952b34feec..b5ce5376108 100644 --- a/go/vt/throttler/throttlerlogz.go +++ b/go/vt/throttler/throttlerlogz.go @@ -152,7 +152,7 @@ func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) { colorLevel = "high" } data := struct { - result + Result ColorLevel string }{r, colorLevel} diff --git a/go/vt/throttler/throttlerlogz_test.go b/go/vt/throttler/throttlerlogz_test.go index 82ebb77e7a1..d5d1ff62327 100644 --- a/go/vt/throttler/throttlerlogz_test.go +++ b/go/vt/throttler/throttlerlogz_test.go @@ -21,6 +21,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/stretchr/testify/require" ) func TestThrottlerlogzHandler_MissingSlash(t *testing.T) { @@ -55,7 +57,7 @@ func TestThrottlerlogzHandler(t *testing.T) { testcases := []struct { desc string - r result + r Result want string }{ { @@ -148,7 +150,9 @@ func TestThrottlerlogzHandler(t *testing.T) { request, _ := http.NewRequest("GET", "/throttlerlogz/t1", nil) response := httptest.NewRecorder() - f.t1.maxReplicationLagModule.results.add(tc.r) + throttler, ok := f.t1.(*ThrottlerImpl) + require.True(t, ok) + throttler.maxReplicationLagModule.results.add(tc.r) throttlerlogzHandler(response, request, f.m) got := response.Body.String() diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index a599b5a0edd..462ffdd239f 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -163,7 +163,7 @@ func InitVtctld(ts *topo.Server) error { if err != nil { log.Errorf("Failed to get the list of known cells, failed to instantiate the healthcheck at startup: %v", err) } else { - healthCheck = discovery.NewHealthCheck(ctx, *vtctl.HealthcheckRetryDelay, *vtctl.HealthCheckTimeout, ts, localCell, strings.Join(cells, ",")) + healthCheck = discovery.NewHealthCheck(ctx, *vtctl.HealthcheckRetryDelay, *vtctl.HealthCheckTimeout, ts, localCell, strings.Join(cells, ","), nil) } } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 6ad595b5202..89431a43212 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -100,7 +100,11 @@ type TabletGateway struct { } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { - return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch) + filters, err := discovery.NewVTGateHealthCheckFilters() + if err != nil { + log.Exit(err) + } + return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters) } // NewTabletGateway creates and returns a new TabletGateway diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index aeb75d258a3..327a37dc43f 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: ThrottlerInterface) +// Source: vitess.io/vitess/go/vt/throttler (interfaces: Throttler) // Package txthrottler is a generated GoMock package. package txthrottler @@ -13,45 +13,46 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" topodata "vitess.io/vitess/go/vt/proto/topodata" + throttler "vitess.io/vitess/go/vt/throttler" ) -// MockThrottlerInterface is a mock of ThrottlerInterface interface. -type MockThrottlerInterface struct { +// MockThrottler is a mock of Throttler interface. +type MockThrottler struct { ctrl *gomock.Controller - recorder *MockThrottlerInterfaceMockRecorder + recorder *MockThrottlerMockRecorder } -// MockThrottlerInterfaceMockRecorder is the mock recorder for MockThrottlerInterface. -type MockThrottlerInterfaceMockRecorder struct { - mock *MockThrottlerInterface +// MockThrottlerMockRecorder is the mock recorder for MockThrottler. +type MockThrottlerMockRecorder struct { + mock *MockThrottler } -// NewMockThrottlerInterface creates a new mock instance. -func NewMockThrottlerInterface(ctrl *gomock.Controller) *MockThrottlerInterface { - mock := &MockThrottlerInterface{ctrl: ctrl} - mock.recorder = &MockThrottlerInterfaceMockRecorder{mock} +// NewMockThrottler creates a new mock instance. +func NewMockThrottler(ctrl *gomock.Controller) *MockThrottler { + mock := &MockThrottler{ctrl: ctrl} + mock.recorder = &MockThrottlerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockThrottlerInterface) EXPECT() *MockThrottlerInterfaceMockRecorder { +func (m *MockThrottler) EXPECT() *MockThrottlerMockRecorder { return m.recorder } // Close mocks base method. -func (m *MockThrottlerInterface) Close() { +func (m *MockThrottler) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. -func (mr *MockThrottlerInterfaceMockRecorder) Close() *gomock.Call { +func (mr *MockThrottlerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottlerInterface)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottler)(nil).Close)) } // GetConfiguration mocks base method. -func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration { +func (m *MockThrottler) GetConfiguration() *throttlerdata.Configuration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetConfiguration") ret0, _ := ret[0].(*throttlerdata.Configuration) @@ -59,27 +60,41 @@ func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration } // GetConfiguration indicates an expected call of GetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) GetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottler)(nil).GetConfiguration)) +} + +// Log mocks base method. +func (m *MockThrottler) Log() []throttler.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Log") + ret0, _ := ret[0].([]throttler.Result) + return ret0 +} + +// Log indicates an expected call of Log. +func (mr *MockThrottlerMockRecorder) Log() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockThrottler)(nil).Log)) } // MaxLag mocks base method. -func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { +func (m *MockThrottler) MaxLag(arg0 topodata.TabletType) uint32 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret := m.ctrl.Call(m, "MaxLag", arg0) ret0, _ := ret[0].(uint32) return ret0 } -// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. -func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { +// MaxLag indicates an expected call of MaxLag. +func (mr *MockThrottlerMockRecorder) MaxLag(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottler)(nil).MaxLag), arg0) } // MaxRate mocks base method. -func (m *MockThrottlerInterface) MaxRate() int64 { +func (m *MockThrottler) MaxRate() int64 { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MaxRate") ret0, _ := ret[0].(int64) @@ -87,61 +102,61 @@ func (m *MockThrottlerInterface) MaxRate() int64 { } // MaxRate indicates an expected call of MaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) MaxRate() *gomock.Call { +func (mr *MockThrottlerMockRecorder) MaxRate() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxRate)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottler)(nil).MaxRate)) } // RecordReplicationLag mocks base method. -func (m *MockThrottlerInterface) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { +func (m *MockThrottler) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { m.ctrl.T.Helper() m.ctrl.Call(m, "RecordReplicationLag", arg0, arg1) } // RecordReplicationLag indicates an expected call of RecordReplicationLag. -func (mr *MockThrottlerInterfaceMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottlerInterface)(nil).RecordReplicationLag), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottler)(nil).RecordReplicationLag), arg0, arg1) } // ResetConfiguration mocks base method. -func (m *MockThrottlerInterface) ResetConfiguration() { +func (m *MockThrottler) ResetConfiguration() { m.ctrl.T.Helper() m.ctrl.Call(m, "ResetConfiguration") } // ResetConfiguration indicates an expected call of ResetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) ResetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) ResetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).ResetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottler)(nil).ResetConfiguration)) } // SetMaxRate mocks base method. -func (m *MockThrottlerInterface) SetMaxRate(arg0 int64) { +func (m *MockThrottler) SetMaxRate(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetMaxRate", arg0) } // SetMaxRate indicates an expected call of SetMaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).SetMaxRate), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottler)(nil).SetMaxRate), arg0) } // ThreadFinished mocks base method. -func (m *MockThrottlerInterface) ThreadFinished(arg0 int) { +func (m *MockThrottler) ThreadFinished(arg0 int) { m.ctrl.T.Helper() m.ctrl.Call(m, "ThreadFinished", arg0) } // ThreadFinished indicates an expected call of ThreadFinished. -func (mr *MockThrottlerInterfaceMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottlerInterface)(nil).ThreadFinished), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottler)(nil).ThreadFinished), arg0) } // Throttle mocks base method. -func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { +func (m *MockThrottler) Throttle(arg0 int) time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Throttle", arg0) ret0, _ := ret[0].(time.Duration) @@ -149,13 +164,13 @@ func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { } // Throttle indicates an expected call of Throttle. -func (mr *MockThrottlerInterfaceMockRecorder) Throttle(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) Throttle(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottlerInterface)(nil).Throttle), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottler)(nil).Throttle), arg0) } // UpdateConfiguration mocks base method. -func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { +func (m *MockThrottler) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateConfiguration", arg0, arg1) ret0, _ := ret[0].(error) @@ -163,7 +178,7 @@ func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configu } // UpdateConfiguration indicates an expected call of UpdateConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).UpdateConfiguration), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottler)(nil).UpdateConfiguration), arg0, arg1) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 8147fcae4bc..70d92aad3a7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -36,15 +36,14 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" - throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // These vars store the functions used to create the topo server, healthcheck, // and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) +type healthCheckFactoryFunc func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) var ( healthCheckFactory healthCheckFactoryFunc @@ -52,10 +51,15 @@ var ( ) func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + // discovery.NewFilterByShard expects a single-shard filter to be in "keyspace|shard" format. + filter, err := discovery.NewFilterByShard([]string{keyspace + "|" + shard}) + if err != nil { + return nil, err + } + return discovery.NewHealthCheck(ctx, discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","), filter), nil } - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } } @@ -72,29 +76,6 @@ type TxThrottler interface { Throttle(priority int, workload string) (result bool) } -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, th *discovery.TabletHealth) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() - MaxLag(tabletType topodatapb.TabletType) uint32 -} - -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - Start() - Stop() -} - // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" @@ -168,9 +149,11 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler throttler.Throttler + + ctx context.Context + cancel context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -296,7 +279,10 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } + ctx, cancel := context.WithCancel(context.Background()) state := &txThrottlerStateImpl{ + ctx: ctx, + cancel: cancel, config: config, healthCheckCells: config.TxThrottlerHealthCheckCells, tabletTypes: tabletTypes, @@ -307,38 +293,42 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi // get cells from topo if none defined in tabletenv config if len(state.healthCheckCells) == 0 { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cellsCancel() + state.healthCheckCells = fetchKnownCells(cellsCtx, txThrottler.topoServer, target) state.cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + if err := state.initHealthCheckStream(txThrottler.topoServer, target); err != nil { + return nil, err + } + state.healthCheck.RegisterStats() + go state.healthChecksProcessor(txThrottler.topoServer, target) state.waitForTermination.Add(1) go state.updateMaxLag() return state, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) (err error) { + ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) + if err != nil { + return err + } ts.healthCheckChan = ts.healthCheck.Subscribe() - + return nil } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.cancel() ts.healthCheck.Close() } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) +func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, target *querypb.Target) error { + fetchCtx, cancel := context.WithTimeout(ts.ctx, topo.RemoteOperationTimeout) defer cancel() knownCells := fetchKnownCells(fetchCtx, topoServer, target) @@ -346,11 +336,12 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + return ts.initHealthCheckStream(topoServer, target) } + return nil } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -359,10 +350,12 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS } for { select { - case <-ctx.Done(): + case <-ts.ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + if err := ts.updateHealthCheckCells(topoServer, target); err != nil { + log.Errorf("txThrottler: failed to update cell list: %+v", err) + } case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index de50f32378d..c595224cb81 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -18,7 +18,7 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck -//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface +//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/throttler Throttler import ( "context" @@ -66,14 +66,17 @@ func TestEnabledThrottler(t *testing.T) { mockHealthCheck := NewMockHealthCheck(mockCtrl) hcCall1 := mockHealthCheck.EXPECT().Subscribe() hcCall1.Do(func() {}) - hcCall2 := mockHealthCheck.EXPECT().Close() + hcCall2 := mockHealthCheck.EXPECT().RegisterStats() + hcCall2.Do(func() {}) hcCall2.After(hcCall1) - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return mockHealthCheck + hcCall3 := mockHealthCheck.EXPECT().Close() + hcCall3.After(hcCall2) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + return mockHealthCheck, nil } - mockThrottler := NewMockThrottlerInterface(mockCtrl) - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + mockThrottler := NewMockThrottler(mockCtrl) + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { assert.Equal(t, 1, threadCount) return mockThrottler, nil } From 2e80efb3a595f32f345f4de2b1e86a5beb15e365 Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Wed, 28 Aug 2024 11:10:35 -0500 Subject: [PATCH 4/4] ignore unused wrangler weighted semaphore --- go/vt/wrangler/wrangler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/wrangler/wrangler.go b/go/vt/wrangler/wrangler.go index 1bf3763f7f6..8c3c3c46690 100644 --- a/go/vt/wrangler/wrangler.go +++ b/go/vt/wrangler/wrangler.go @@ -22,6 +22,7 @@ import ( "context" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" @@ -55,6 +56,7 @@ type Wrangler struct { // DO NOT USE in production code. VExecFunc func(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) // Limt the number of concurrent background goroutines if needed. + // nolint:ignore U1000 sem *semaphore.Weighted }