From 11c63b2e2a5d5e26cf157ac82891ca41118e01ec Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 18 Jul 2024 00:47:02 +0200 Subject: [PATCH] `txthrottler`: move `ThrottlerInterface` to `go/vt/throttler`, use `slices` pkg, add stats (#16248) Signed-off-by: Tim Vaillancourt Signed-off-by: Florent Poinsard --- go/vt/throttler/demo/throttler_demo.go | 4 +- go/vt/throttler/manager.go | 10 +- go/vt/throttler/manager_test.go | 2 +- go/vt/throttler/max_replication_lag_module.go | 20 ++-- go/vt/throttler/result.go | 24 ++-- go/vt/throttler/result_test.go | 20 ++-- go/vt/throttler/throttler.go | 56 ++++++---- go/vt/throttler/throttler_test.go | 12 +- go/vt/throttler/throttlerlogz.go | 2 +- go/vt/throttler/throttlerlogz_test.go | 6 +- .../txthrottler/mock_throttler_test.go | 103 ++++++++++-------- .../tabletserver/txthrottler/tx_throttler.go | 28 +---- .../txthrottler/tx_throttler_test.go | 11 +- 13 files changed, 161 insertions(+), 137 deletions(-) diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index cd2a4e11307..032b1fae3ca 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -103,7 +103,7 @@ type replica struct { // throttler is used to enforce the maximum rate at which replica applies // transactions. It must not be confused with the client's throttler. - throttler *throttler.Throttler + throttler throttler.Throttler lastHealthUpdate time.Time lagUpdateInterval time.Duration @@ -226,7 +226,7 @@ type client struct { primary *primary healthCheck discovery.HealthCheck - throttler *throttler.Throttler + throttler throttler.Throttler stopChan chan struct{} wg sync.WaitGroup diff --git a/go/vt/throttler/manager.go b/go/vt/throttler/manager.go index c2ee9f0a652..ee142190f75 100644 --- a/go/vt/throttler/manager.go +++ b/go/vt/throttler/manager.go @@ -64,16 +64,16 @@ type managerImpl struct { // mu guards all fields in this group. mu sync.Mutex // throttlers tracks all running throttlers (by their name). - throttlers map[string]*Throttler + throttlers map[string]Throttler } func newManager() *managerImpl { return &managerImpl{ - throttlers: make(map[string]*Throttler), + throttlers: make(map[string]Throttler), } } -func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error { +func (m *managerImpl) registerThrottler(name string, throttler Throttler) error { m.mu.Lock() defer m.mu.Unlock() @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string { // log returns the most recent changes of the MaxReplicationLag module. // There will be one result for each processed replication lag record. -func (m *managerImpl) log(throttlerName string) ([]result, error) { +func (m *managerImpl) log(throttlerName string) ([]Result, error) { m.mu.Lock() defer m.mu.Unlock() @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) { return nil, fmt.Errorf("throttler: %v does not exist", throttlerName) } - return t.log(), nil + return t.Log(), nil } diff --git a/go/vt/throttler/manager_test.go b/go/vt/throttler/manager_test.go index 3d61d4d6b68..21b6ba2e67d 100644 --- a/go/vt/throttler/manager_test.go +++ b/go/vt/throttler/manager_test.go @@ -37,7 +37,7 @@ var ( type managerTestFixture struct { m *managerImpl - t1, t2 *Throttler + t1, t2 Throttler } func (f *managerTestFixture) setUp() error { diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index ac184fe7be8..6f86a50400f 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -313,7 +313,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec m.memory.ageBadRate(now) - r := result{ + r := Result{ Now: now, RateChange: unchangedRate, lastRateChange: m.lastRateChange, @@ -446,7 +446,7 @@ func stateGreater(a, b state) bool { // and we should not skip the current replica ("lagRecordNow"). // Even if it's the same replica we may skip it and return false because // we want to wait longer for the propagation of the current rate change. -func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { +func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { if m.replicaUnderTest == nil { return true } @@ -472,7 +472,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t return true } -func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown) oldRate := m.rate.Load() @@ -560,7 +560,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa return minDuration } -func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { // Guess replication rate based on the difference in the replication lag of this // particular replica. lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange) @@ -631,7 +631,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, // guessReplicationRate guesses the actual replication rate based on the new bac // Note that "lagDifference" can be positive (lag increased) or negative (lag // decreased). -func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { +func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { // avgReplicationRate is the average rate (per second) at which the replica // applied transactions from the replication stream. We infer the value // from the relative change in the replication lag. @@ -676,14 +676,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate return int64(newRate), reason } -func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown) decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec) m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason) } -func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { +func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { oldRate := m.rate.Load() rate := int64(float64(oldRate) - float64(oldRate)*decrease) if rate == 0 { @@ -695,7 +695,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases()) } -func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { +func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { oldRate := m.rate.Load() m.currentState = newState @@ -723,7 +723,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int // markCurrentRateAsBadOrGood determines the actual rate between the last rate // change and "now" and determines if that rate was bad or good. -func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) { +func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) { if m.lastRateChange.IsZero() { // Module was just started. We don't have any data points yet. r.GoodOrBad = ignoredRate @@ -797,6 +797,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time } } -func (m *MaxReplicationLagModule) log() []result { +func (m *MaxReplicationLagModule) log() []Result { return m.results.latestValues() } diff --git a/go/vt/throttler/result.go b/go/vt/throttler/result.go index 0976a180877..bd7a86b3b18 100644 --- a/go/vt/throttler/result.go +++ b/go/vt/throttler/result.go @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}} lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}} reason: {{.Reason}}`)) -// result is generated by the MaxReplicationLag module for each processed +// Result is generated by the MaxReplicationLag module for each processed // "replicationLagRecord". // It captures the details and the decision of the processing. -type result struct { +type Result struct { Now time.Time RateChange rateChange lastRateChange time.Time @@ -80,7 +80,7 @@ type result struct { GuessedReplicationBacklogNew int } -func (r result) String() string { +func (r Result) String() string { var b strings.Builder if err := resultStringTemplate.Execute(&b, r); err != nil { panic(fmt.Sprintf("failed to Execute() template: %v", err)) @@ -88,25 +88,25 @@ func (r result) String() string { return b.String() } -func (r result) Alias() string { +func (r Result) Alias() string { return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias) } -func (r result) TimeSinceLastRateChange() string { +func (r Result) TimeSinceLastRateChange() string { if r.lastRateChange.IsZero() { return "n/a" } return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds()) } -func (r result) LagBefore() string { +func (r Result) LagBefore() string { if r.LagRecordBefore.isZero() { return "n/a" } return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds) } -func (r result) AgeOfBeforeLag() string { +func (r Result) AgeOfBeforeLag() string { if r.LagRecordBefore.isZero() { return "n/a" } @@ -123,18 +123,18 @@ type resultRing struct { // started reusing entries. wrapped bool // values is the underlying ring buffer. - values []result + values []Result } // newResultRing creates a new resultRing. func newResultRing(capacity int) *resultRing { return &resultRing{ - values: make([]result, capacity), + values: make([]Result, capacity), } } // add inserts a new result into the ring buffer. -func (rr *resultRing) add(r result) { +func (rr *resultRing) add(r Result) { rr.mu.Lock() defer rr.mu.Unlock() @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) { // latestValues returns all values of the buffer. Entries are sorted in reverse // chronological order i.e. newer items come first. -func (rr *resultRing) latestValues() []result { +func (rr *resultRing) latestValues() []Result { rr.mu.Lock() defer rr.mu.Unlock() @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result { count = rr.position } - results := make([]result, count) + results := make([]Result, count) for i := 0; i < count; i++ { pos := start - i if pos < 0 { diff --git a/go/vt/throttler/result_test.go b/go/vt/throttler/result_test.go index 8cc5357ef7b..8bab6d9296a 100644 --- a/go/vt/throttler/result_test.go +++ b/go/vt/throttler/result_test.go @@ -24,7 +24,7 @@ import ( ) var ( - resultIncreased = result{ + resultIncreased = Result{ Now: sinceZero(1234 * time.Millisecond), RateChange: increasedRate, lastRateChange: sinceZero(1 * time.Millisecond), @@ -46,7 +46,7 @@ var ( GuessedReplicationBacklogOld: 0, GuessedReplicationBacklogNew: 0, } - resultDecreased = result{ + resultDecreased = Result{ Now: sinceZero(5000 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(1234 * time.Millisecond), @@ -68,7 +68,7 @@ var ( GuessedReplicationBacklogOld: 10, GuessedReplicationBacklogNew: 20, } - resultEmergency = result{ + resultEmergency = Result{ Now: sinceZero(10123 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(5000 * time.Millisecond), @@ -94,7 +94,7 @@ var ( func TestResultString(t *testing.T) { testcases := []struct { - r result + r Result want string }{ { @@ -134,24 +134,24 @@ reason: emergency state decreased the rate`, func TestResultRing(t *testing.T) { // Test data. - r1 := result{Reason: "r1"} - r2 := result{Reason: "r2"} - r3 := result{Reason: "r3"} + r1 := Result{Reason: "r1"} + r2 := Result{Reason: "r2"} + r3 := Result{Reason: "r3"} rr := newResultRing(2) // Use the ring partially. rr.add(r1) - got, want := rr.latestValues(), []result{r1} + got, want := rr.latestValues(), []Result{r1} require.Equal(t, want, got, "items not correctly added to resultRing") // Use it fully. rr.add(r2) - got, want = rr.latestValues(), []result{r2, r1} + got, want = rr.latestValues(), []Result{r2, r1} require.Equal(t, want, got, "items not correctly added to resultRing") // Let it wrap. rr.add(r3) - got, want = rr.latestValues(), []result{r3, r2} + got, want = rr.latestValues(), []Result{r3, r2} require.Equal(t, want, got, "resultRing did not wrap correctly") } diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 909888bd0d4..a12d8f241a2 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/proto/topodata" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -66,7 +67,22 @@ const ( InvalidMaxReplicationLag = -1 ) -// Throttler provides a client-side, thread-aware throttler. +// Throttler defines the throttler interface. +type Throttler interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() + MaxLag(tabletType topodatapb.TabletType) uint32 + Log() []Result +} + +// ThrottlerImpl implements a client-side, thread-aware throttler. // See the package doc for more information. // // Calls of Throttle() and ThreadFinished() take threadID as parameter which is @@ -74,7 +90,7 @@ const ( // NOTE: Trottle() and ThreadFinished() assume that *per thread* calls to them // // are serialized and must not happen concurrently. -type Throttler struct { +type ThrottlerImpl struct { // name describes the Throttler instance and is used e.g. in the webinterface. name string // unit describes the entity the throttler is limiting e.g. "queries" or @@ -127,15 +143,15 @@ type Throttler struct { // unit refers to the type of entity you want to throttle e.g. "queries" or // "transactions". // name describes the Throttler instance and will be used by the webinterface. -func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (*Throttler, error) { +func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now) } -func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc) } -func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { config := NewMaxReplicationLagModuleConfig(maxReplicationLag) config.MaxReplicationLagSec = maxReplicationLag @@ -143,7 +159,7 @@ func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxR } -func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (Throttler, error) { err := maxReplicationLagModuleConfig.Verify() if err != nil { return nil, fmt.Errorf("invalid max replication lag config: %w", err) @@ -176,7 +192,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount threadThrottlers[i] = newThreadThrottler(i, actualRateHistory) runningThreads[i] = true } - t := &Throttler{ + t := &ThrottlerImpl{ name: name, unit: unit, manager: manager, @@ -215,7 +231,7 @@ func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount // the backoff duration elapsed. // The maximum value for the returned backoff is 1 second since the throttler // internally operates on a per-second basis. -func (t *Throttler) Throttle(threadID int) time.Duration { +func (t *ThrottlerImpl) Throttle(threadID int) time.Duration { if t.closed { panic(fmt.Sprintf("BUG: thread with ID: %v must not access closed Throttler", threadID)) } @@ -227,7 +243,7 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // MaxLag returns the max of all the last replication lag values seen across all tablets of // the provided type, excluding ignored tablets. -func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { +func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) var maxLag uint32 @@ -250,7 +266,7 @@ func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { // ThreadFinished marks threadID as finished and redistributes the thread's // rate allotment across the other threads. // After ThreadFinished() is called, Throttle() must not be called anymore. -func (t *Throttler) ThreadFinished(threadID int) { +func (t *ThrottlerImpl) ThreadFinished(threadID int) { if t.threadFinished[threadID] { panic(fmt.Sprintf("BUG: thread with ID: %v already finished", threadID)) } @@ -265,7 +281,7 @@ func (t *Throttler) ThreadFinished(threadID int) { // Close stops all modules and frees all resources. // When Close() returned, the Throttler object must not be used anymore. -func (t *Throttler) Close() { +func (t *ThrottlerImpl) Close() { for _, m := range t.modules { m.Stop() } @@ -278,7 +294,7 @@ func (t *Throttler) Close() { // threadThrottlers accordingly. // The rate changes when the number of thread changes or a module updated its // max rate. -func (t *Throttler) updateMaxRate() { +func (t *ThrottlerImpl) updateMaxRate() { // Set it to infinite initially. maxRate := int64(math.MaxInt64) @@ -319,39 +335,39 @@ func (t *Throttler) updateMaxRate() { } // MaxRate returns the current rate of the MaxRateModule. -func (t *Throttler) MaxRate() int64 { +func (t *ThrottlerImpl) MaxRate() int64 { return t.maxRateModule.MaxRate() } // SetMaxRate updates the rate of the MaxRateModule. -func (t *Throttler) SetMaxRate(rate int64) { +func (t *ThrottlerImpl) SetMaxRate(rate int64) { t.maxRateModule.SetMaxRate(rate) } // RecordReplicationLag must be called by users to report the "ts" tablet health // data observed at "time". // Note: After Close() is called, this method must not be called anymore. -func (t *Throttler) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { +func (t *ThrottlerImpl) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) { t.maxReplicationLagModule.RecordReplicationLag(time, th) } // GetConfiguration returns the configuration of the MaxReplicationLag module. -func (t *Throttler) GetConfiguration() *throttlerdatapb.Configuration { +func (t *ThrottlerImpl) GetConfiguration() *throttlerdatapb.Configuration { return t.maxReplicationLagModule.getConfiguration() } // UpdateConfiguration updates the configuration of the MaxReplicationLag module. -func (t *Throttler) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { +func (t *ThrottlerImpl) UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error { return t.maxReplicationLagModule.updateConfiguration(configuration, copyZeroValues) } // ResetConfiguration resets the configuration of the MaxReplicationLag module // to its initial settings. -func (t *Throttler) ResetConfiguration() { +func (t *ThrottlerImpl) ResetConfiguration() { t.maxReplicationLagModule.resetConfiguration() } -// log returns the most recent changes of the MaxReplicationLag module. -func (t *Throttler) log() []result { +// Log returns the most recent changes of the MaxReplicationLag module. +func (t *ThrottlerImpl) Log() []Result { return t.maxReplicationLagModule.log() } diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index b33bb2ca255..74e8784f021 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -163,7 +163,7 @@ func sinceZero(sinceZero time.Duration) time.Time { // threadThrottler.newThreadThrottler() for more details. // newThrottlerWithClock should only be used for testing. -func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) { +func newThrottlerWithClock(name, unit string, threadCount int, maxRate int64, maxReplicationLag int64, nowFunc func() time.Time) (Throttler, error) { return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, nowFunc) } @@ -264,14 +264,16 @@ func TestThreadFinished(t *testing.T) { // Max rate update to threadThrottlers happens asynchronously. Wait for it. timer := time.NewTimer(2 * time.Second) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) for { - if throttler.threadThrottlers[0].getMaxRate() == 2 { + if throttlerImpl.threadThrottlers[0].getMaxRate() == 2 { timer.Stop() break } select { case <-timer.C: - t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttler.threadThrottlers[0].getMaxRate()) + t.Fatalf("max rate was not propapgated to threadThrottler[0] in time: %v", throttlerImpl.threadThrottlers[0].getMaxRate()) default: // Timer not up yet. Try again. } @@ -369,7 +371,9 @@ func TestUpdateMaxRate_AllThreadsFinished(t *testing.T) { throttler.ThreadFinished(1) // Make sure that there's no division by zero error (threadsRunning == 0). - throttler.updateMaxRate() + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + throttlerImpl.updateMaxRate() // We don't care about the Throttler state at this point. } diff --git a/go/vt/throttler/throttlerlogz.go b/go/vt/throttler/throttlerlogz.go index 4023dcd7e68..8cace8ce5c4 100644 --- a/go/vt/throttler/throttlerlogz.go +++ b/go/vt/throttler/throttlerlogz.go @@ -159,7 +159,7 @@ func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) { colorLevel = "high" } data := struct { - result + Result ColorLevel string }{r, colorLevel} diff --git a/go/vt/throttler/throttlerlogz_test.go b/go/vt/throttler/throttlerlogz_test.go index 22927f3f201..78151fc750b 100644 --- a/go/vt/throttler/throttlerlogz_test.go +++ b/go/vt/throttler/throttlerlogz_test.go @@ -54,7 +54,7 @@ func TestThrottlerlogzHandler(t *testing.T) { testcases := []struct { desc string - r result + r Result want string }{ { @@ -147,7 +147,9 @@ func TestThrottlerlogzHandler(t *testing.T) { request, _ := http.NewRequest("GET", "/throttlerlogz/t1", nil) response := httptest.NewRecorder() - f.t1.maxReplicationLagModule.results.add(tc.r) + throttler, ok := f.t1.(*ThrottlerImpl) + require.True(t, ok) + throttler.maxReplicationLagModule.results.add(tc.r) throttlerlogzHandler(response, request, f.m) got := response.Body.String() diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index aeb75d258a3..327a37dc43f 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: ThrottlerInterface) +// Source: vitess.io/vitess/go/vt/throttler (interfaces: Throttler) // Package txthrottler is a generated GoMock package. package txthrottler @@ -13,45 +13,46 @@ import ( discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" topodata "vitess.io/vitess/go/vt/proto/topodata" + throttler "vitess.io/vitess/go/vt/throttler" ) -// MockThrottlerInterface is a mock of ThrottlerInterface interface. -type MockThrottlerInterface struct { +// MockThrottler is a mock of Throttler interface. +type MockThrottler struct { ctrl *gomock.Controller - recorder *MockThrottlerInterfaceMockRecorder + recorder *MockThrottlerMockRecorder } -// MockThrottlerInterfaceMockRecorder is the mock recorder for MockThrottlerInterface. -type MockThrottlerInterfaceMockRecorder struct { - mock *MockThrottlerInterface +// MockThrottlerMockRecorder is the mock recorder for MockThrottler. +type MockThrottlerMockRecorder struct { + mock *MockThrottler } -// NewMockThrottlerInterface creates a new mock instance. -func NewMockThrottlerInterface(ctrl *gomock.Controller) *MockThrottlerInterface { - mock := &MockThrottlerInterface{ctrl: ctrl} - mock.recorder = &MockThrottlerInterfaceMockRecorder{mock} +// NewMockThrottler creates a new mock instance. +func NewMockThrottler(ctrl *gomock.Controller) *MockThrottler { + mock := &MockThrottler{ctrl: ctrl} + mock.recorder = &MockThrottlerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockThrottlerInterface) EXPECT() *MockThrottlerInterfaceMockRecorder { +func (m *MockThrottler) EXPECT() *MockThrottlerMockRecorder { return m.recorder } // Close mocks base method. -func (m *MockThrottlerInterface) Close() { +func (m *MockThrottler) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. -func (mr *MockThrottlerInterfaceMockRecorder) Close() *gomock.Call { +func (mr *MockThrottlerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottlerInterface)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockThrottler)(nil).Close)) } // GetConfiguration mocks base method. -func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration { +func (m *MockThrottler) GetConfiguration() *throttlerdata.Configuration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetConfiguration") ret0, _ := ret[0].(*throttlerdata.Configuration) @@ -59,27 +60,41 @@ func (m *MockThrottlerInterface) GetConfiguration() *throttlerdata.Configuration } // GetConfiguration indicates an expected call of GetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) GetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottler)(nil).GetConfiguration)) +} + +// Log mocks base method. +func (m *MockThrottler) Log() []throttler.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Log") + ret0, _ := ret[0].([]throttler.Result) + return ret0 +} + +// Log indicates an expected call of Log. +func (mr *MockThrottlerMockRecorder) Log() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockThrottler)(nil).Log)) } // MaxLag mocks base method. -func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { +func (m *MockThrottler) MaxLag(arg0 topodata.TabletType) uint32 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MaxLag", tabletType) + ret := m.ctrl.Call(m, "MaxLag", arg0) ret0, _ := ret[0].(uint32) return ret0 } -// MaxLag indicates an expected call of LastMaxLagNotIgnoredForTabletType. -func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { +// MaxLag indicates an expected call of MaxLag. +func (mr *MockThrottlerMockRecorder) MaxLag(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottler)(nil).MaxLag), arg0) } // MaxRate mocks base method. -func (m *MockThrottlerInterface) MaxRate() int64 { +func (m *MockThrottler) MaxRate() int64 { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MaxRate") ret0, _ := ret[0].(int64) @@ -87,61 +102,61 @@ func (m *MockThrottlerInterface) MaxRate() int64 { } // MaxRate indicates an expected call of MaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) MaxRate() *gomock.Call { +func (mr *MockThrottlerMockRecorder) MaxRate() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxRate)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxRate", reflect.TypeOf((*MockThrottler)(nil).MaxRate)) } // RecordReplicationLag mocks base method. -func (m *MockThrottlerInterface) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { +func (m *MockThrottler) RecordReplicationLag(arg0 time.Time, arg1 *discovery.TabletHealth) { m.ctrl.T.Helper() m.ctrl.Call(m, "RecordReplicationLag", arg0, arg1) } // RecordReplicationLag indicates an expected call of RecordReplicationLag. -func (mr *MockThrottlerInterfaceMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) RecordReplicationLag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottlerInterface)(nil).RecordReplicationLag), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordReplicationLag", reflect.TypeOf((*MockThrottler)(nil).RecordReplicationLag), arg0, arg1) } // ResetConfiguration mocks base method. -func (m *MockThrottlerInterface) ResetConfiguration() { +func (m *MockThrottler) ResetConfiguration() { m.ctrl.T.Helper() m.ctrl.Call(m, "ResetConfiguration") } // ResetConfiguration indicates an expected call of ResetConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) ResetConfiguration() *gomock.Call { +func (mr *MockThrottlerMockRecorder) ResetConfiguration() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).ResetConfiguration)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetConfiguration", reflect.TypeOf((*MockThrottler)(nil).ResetConfiguration)) } // SetMaxRate mocks base method. -func (m *MockThrottlerInterface) SetMaxRate(arg0 int64) { +func (m *MockThrottler) SetMaxRate(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetMaxRate", arg0) } // SetMaxRate indicates an expected call of SetMaxRate. -func (mr *MockThrottlerInterfaceMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) SetMaxRate(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottlerInterface)(nil).SetMaxRate), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMaxRate", reflect.TypeOf((*MockThrottler)(nil).SetMaxRate), arg0) } // ThreadFinished mocks base method. -func (m *MockThrottlerInterface) ThreadFinished(arg0 int) { +func (m *MockThrottler) ThreadFinished(arg0 int) { m.ctrl.T.Helper() m.ctrl.Call(m, "ThreadFinished", arg0) } // ThreadFinished indicates an expected call of ThreadFinished. -func (mr *MockThrottlerInterfaceMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) ThreadFinished(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottlerInterface)(nil).ThreadFinished), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ThreadFinished", reflect.TypeOf((*MockThrottler)(nil).ThreadFinished), arg0) } // Throttle mocks base method. -func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { +func (m *MockThrottler) Throttle(arg0 int) time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Throttle", arg0) ret0, _ := ret[0].(time.Duration) @@ -149,13 +164,13 @@ func (m *MockThrottlerInterface) Throttle(arg0 int) time.Duration { } // Throttle indicates an expected call of Throttle. -func (mr *MockThrottlerInterfaceMockRecorder) Throttle(arg0 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) Throttle(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottlerInterface)(nil).Throttle), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Throttle", reflect.TypeOf((*MockThrottler)(nil).Throttle), arg0) } // UpdateConfiguration mocks base method. -func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { +func (m *MockThrottler) UpdateConfiguration(arg0 *throttlerdata.Configuration, arg1 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateConfiguration", arg0, arg1) ret0, _ := ret[0].(error) @@ -163,7 +178,7 @@ func (m *MockThrottlerInterface) UpdateConfiguration(arg0 *throttlerdata.Configu } // UpdateConfiguration indicates an expected call of UpdateConfiguration. -func (mr *MockThrottlerInterfaceMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockThrottlerMockRecorder) UpdateConfiguration(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).UpdateConfiguration), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfiguration", reflect.TypeOf((*MockThrottler)(nil).UpdateConfiguration), arg0, arg1) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 7cb774663a4..18fafa8eb96 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -19,7 +19,7 @@ package txthrottler import ( "context" "math/rand/v2" - "reflect" + "slices" "strings" "sync" "sync/atomic" @@ -34,7 +34,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" - throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -42,7 +41,7 @@ import ( // and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) var ( healthCheckFactory healthCheckFactoryFunc @@ -53,7 +52,7 @@ func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } } @@ -70,21 +69,6 @@ type TxThrottler interface { Throttle(priority int, workload string) (result bool) } -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, th *discovery.TabletHealth) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() - MaxLag(tabletType topodatapb.TabletType) uint32 -} - // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" @@ -159,7 +143,7 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. throttleMu sync.Mutex - throttler ThrottlerInterface + throttler throttler.Throttler stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck @@ -304,6 +288,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi ctx, cancel := context.WithCancel(context.Background()) state.stopHealthCheck = cancel state.initHealthCheckStream(txThrottler.topoServer, target) + state.healthCheck.RegisterStats() go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) state.waitForTermination.Add(1) go state.updateMaxLag() @@ -314,7 +299,6 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() - } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { @@ -330,7 +314,7 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo defer cancel() knownCells := fetchKnownCells(fetchCtx, topoServer, target) - if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + if !slices.Equal(knownCells, ts.healthCheckCells) { log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells ts.closeHealthCheckStream() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index fe352cf96f4..25c855b898b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -18,7 +18,7 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck -//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface +//go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/throttler Throttler import ( "context" @@ -69,14 +69,17 @@ func TestEnabledThrottler(t *testing.T) { mockHealthCheck := NewMockHealthCheck(mockCtrl) hcCall1 := mockHealthCheck.EXPECT().Subscribe() hcCall1.Do(func() {}) - hcCall2 := mockHealthCheck.EXPECT().Close() + hcCall2 := mockHealthCheck.EXPECT().RegisterStats() + hcCall2.Do(func() {}) hcCall2.After(hcCall1) + hcCall3 := mockHealthCheck.EXPECT().Close() + hcCall3.After(hcCall2) healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return mockHealthCheck } - mockThrottler := NewMockThrottlerInterface(mockCtrl) - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + mockThrottler := NewMockThrottler(mockCtrl) + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (throttler.Throttler, error) { assert.Equal(t, 1, threadCount) return mockThrottler, nil }