diff --git a/go/vt/throttler/manager.go b/go/vt/throttler/manager.go index 8c56d6e167a..ee142190f75 100644 --- a/go/vt/throttler/manager.go +++ b/go/vt/throttler/manager.go @@ -17,7 +17,6 @@ limitations under the License. package throttler import ( - "errors" "fmt" "sort" "sync" @@ -208,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string { // log returns the most recent changes of the MaxReplicationLag module. // There will be one result for each processed replication lag record. -func (m *managerImpl) log(throttlerName string) ([]result, error) { +func (m *managerImpl) log(throttlerName string) ([]Result, error) { m.mu.Lock() defer m.mu.Unlock() @@ -217,9 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) { return nil, fmt.Errorf("throttler: %v does not exist", throttlerName) } - throttlerImpl, ok := t.(*ThrottlerImpl) - if !ok { - return nil, errors.New("unexpected throttler implementation") - } - return throttlerImpl.log(), nil + return t.Log(), nil } diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index ac184fe7be8..6f86a50400f 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -313,7 +313,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec m.memory.ageBadRate(now) - r := result{ + r := Result{ Now: now, RateChange: unchangedRate, lastRateChange: m.lastRateChange, @@ -446,7 +446,7 @@ func stateGreater(a, b state) bool { // and we should not skip the current replica ("lagRecordNow"). // Even if it's the same replica we may skip it and return false because // we want to wait longer for the propagation of the current rate change. -func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { +func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool { if m.replicaUnderTest == nil { return true } @@ -472,7 +472,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t return true } -func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown) oldRate := m.rate.Load() @@ -560,7 +560,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa return minDuration } -func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) { // Guess replication rate based on the difference in the replication lag of this // particular replica. lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange) @@ -631,7 +631,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, // guessReplicationRate guesses the actual replication rate based on the new bac // Note that "lagDifference" can be positive (lag increased) or negative (lag // decreased). -func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { +func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) { // avgReplicationRate is the average rate (per second) at which the replica // applied transactions from the replication stream. We infer the value // from the relative change in the replication lag. @@ -676,14 +676,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate return int64(newRate), reason } -func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) { +func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) { m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown) decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec) m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason) } -func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { +func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) { oldRate := m.rate.Load() rate := int64(float64(oldRate) - float64(oldRate)*decrease) if rate == 0 { @@ -695,7 +695,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases()) } -func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { +func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) { oldRate := m.rate.Load() m.currentState = newState @@ -723,7 +723,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int // markCurrentRateAsBadOrGood determines the actual rate between the last rate // change and "now" and determines if that rate was bad or good. -func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) { +func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) { if m.lastRateChange.IsZero() { // Module was just started. We don't have any data points yet. r.GoodOrBad = ignoredRate @@ -797,6 +797,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time } } -func (m *MaxReplicationLagModule) log() []result { +func (m *MaxReplicationLagModule) log() []Result { return m.results.latestValues() } diff --git a/go/vt/throttler/result.go b/go/vt/throttler/result.go index 0976a180877..bd7a86b3b18 100644 --- a/go/vt/throttler/result.go +++ b/go/vt/throttler/result.go @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}} lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}} reason: {{.Reason}}`)) -// result is generated by the MaxReplicationLag module for each processed +// Result is generated by the MaxReplicationLag module for each processed // "replicationLagRecord". // It captures the details and the decision of the processing. -type result struct { +type Result struct { Now time.Time RateChange rateChange lastRateChange time.Time @@ -80,7 +80,7 @@ type result struct { GuessedReplicationBacklogNew int } -func (r result) String() string { +func (r Result) String() string { var b strings.Builder if err := resultStringTemplate.Execute(&b, r); err != nil { panic(fmt.Sprintf("failed to Execute() template: %v", err)) @@ -88,25 +88,25 @@ func (r result) String() string { return b.String() } -func (r result) Alias() string { +func (r Result) Alias() string { return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias) } -func (r result) TimeSinceLastRateChange() string { +func (r Result) TimeSinceLastRateChange() string { if r.lastRateChange.IsZero() { return "n/a" } return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds()) } -func (r result) LagBefore() string { +func (r Result) LagBefore() string { if r.LagRecordBefore.isZero() { return "n/a" } return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds) } -func (r result) AgeOfBeforeLag() string { +func (r Result) AgeOfBeforeLag() string { if r.LagRecordBefore.isZero() { return "n/a" } @@ -123,18 +123,18 @@ type resultRing struct { // started reusing entries. wrapped bool // values is the underlying ring buffer. - values []result + values []Result } // newResultRing creates a new resultRing. func newResultRing(capacity int) *resultRing { return &resultRing{ - values: make([]result, capacity), + values: make([]Result, capacity), } } // add inserts a new result into the ring buffer. -func (rr *resultRing) add(r result) { +func (rr *resultRing) add(r Result) { rr.mu.Lock() defer rr.mu.Unlock() @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) { // latestValues returns all values of the buffer. Entries are sorted in reverse // chronological order i.e. newer items come first. -func (rr *resultRing) latestValues() []result { +func (rr *resultRing) latestValues() []Result { rr.mu.Lock() defer rr.mu.Unlock() @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result { count = rr.position } - results := make([]result, count) + results := make([]Result, count) for i := 0; i < count; i++ { pos := start - i if pos < 0 { diff --git a/go/vt/throttler/result_test.go b/go/vt/throttler/result_test.go index 8cc5357ef7b..8bab6d9296a 100644 --- a/go/vt/throttler/result_test.go +++ b/go/vt/throttler/result_test.go @@ -24,7 +24,7 @@ import ( ) var ( - resultIncreased = result{ + resultIncreased = Result{ Now: sinceZero(1234 * time.Millisecond), RateChange: increasedRate, lastRateChange: sinceZero(1 * time.Millisecond), @@ -46,7 +46,7 @@ var ( GuessedReplicationBacklogOld: 0, GuessedReplicationBacklogNew: 0, } - resultDecreased = result{ + resultDecreased = Result{ Now: sinceZero(5000 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(1234 * time.Millisecond), @@ -68,7 +68,7 @@ var ( GuessedReplicationBacklogOld: 10, GuessedReplicationBacklogNew: 20, } - resultEmergency = result{ + resultEmergency = Result{ Now: sinceZero(10123 * time.Millisecond), RateChange: decreasedRate, lastRateChange: sinceZero(5000 * time.Millisecond), @@ -94,7 +94,7 @@ var ( func TestResultString(t *testing.T) { testcases := []struct { - r result + r Result want string }{ { @@ -134,24 +134,24 @@ reason: emergency state decreased the rate`, func TestResultRing(t *testing.T) { // Test data. - r1 := result{Reason: "r1"} - r2 := result{Reason: "r2"} - r3 := result{Reason: "r3"} + r1 := Result{Reason: "r1"} + r2 := Result{Reason: "r2"} + r3 := Result{Reason: "r3"} rr := newResultRing(2) // Use the ring partially. rr.add(r1) - got, want := rr.latestValues(), []result{r1} + got, want := rr.latestValues(), []Result{r1} require.Equal(t, want, got, "items not correctly added to resultRing") // Use it fully. rr.add(r2) - got, want = rr.latestValues(), []result{r2, r1} + got, want = rr.latestValues(), []Result{r2, r1} require.Equal(t, want, got, "items not correctly added to resultRing") // Let it wrap. rr.add(r3) - got, want = rr.latestValues(), []result{r3, r2} + got, want = rr.latestValues(), []Result{r3, r2} require.Equal(t, want, got, "resultRing did not wrap correctly") } diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index d3d6c954cdb..a12d8f241a2 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -79,6 +79,7 @@ type Throttler interface { UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() MaxLag(tabletType topodatapb.TabletType) uint32 + Log() []Result } // ThrottlerImpl implements a client-side, thread-aware throttler. @@ -366,7 +367,7 @@ func (t *ThrottlerImpl) ResetConfiguration() { t.maxReplicationLagModule.resetConfiguration() } -// log returns the most recent changes of the MaxReplicationLag module. -func (t *ThrottlerImpl) log() []result { +// Log returns the most recent changes of the MaxReplicationLag module. +func (t *ThrottlerImpl) Log() []Result { return t.maxReplicationLagModule.log() } diff --git a/go/vt/throttler/throttlerlogz.go b/go/vt/throttler/throttlerlogz.go index 4023dcd7e68..8cace8ce5c4 100644 --- a/go/vt/throttler/throttlerlogz.go +++ b/go/vt/throttler/throttlerlogz.go @@ -159,7 +159,7 @@ func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) { colorLevel = "high" } data := struct { - result + Result ColorLevel string }{r, colorLevel} diff --git a/go/vt/throttler/throttlerlogz_test.go b/go/vt/throttler/throttlerlogz_test.go index fa012532ecf..78151fc750b 100644 --- a/go/vt/throttler/throttlerlogz_test.go +++ b/go/vt/throttler/throttlerlogz_test.go @@ -54,7 +54,7 @@ func TestThrottlerlogzHandler(t *testing.T) { testcases := []struct { desc string - r result + r Result want string }{ { diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 9732d5a28ed..327a37dc43f 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -9,9 +9,11 @@ import ( time "time" gomock "go.uber.org/mock/gomock" + discovery "vitess.io/vitess/go/vt/discovery" throttlerdata "vitess.io/vitess/go/vt/proto/throttlerdata" topodata "vitess.io/vitess/go/vt/proto/topodata" + throttler "vitess.io/vitess/go/vt/throttler" ) // MockThrottler is a mock of Throttler interface. @@ -63,6 +65,20 @@ func (mr *MockThrottlerMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottler)(nil).GetConfiguration)) } +// Log mocks base method. +func (m *MockThrottler) Log() []throttler.Result { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Log") + ret0, _ := ret[0].([]throttler.Result) + return ret0 +} + +// Log indicates an expected call of Log. +func (mr *MockThrottlerMockRecorder) Log() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Log", reflect.TypeOf((*MockThrottler)(nil).Log)) +} + // MaxLag mocks base method. func (m *MockThrottler) MaxLag(arg0 topodata.TabletType) uint32 { m.ctrl.T.Helper()