Skip to content

Commit

Permalink
Add .Log() to interface
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Jun 17, 2024
1 parent 2eaf7ef commit 5b2294f
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 43 deletions.
9 changes: 2 additions & 7 deletions go/vt/throttler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package throttler

import (
"errors"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -208,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {

// log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) log(throttlerName string) ([]result, error) {
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -217,9 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}

throttlerImpl, ok := t.(*ThrottlerImpl)
if !ok {
return nil, errors.New("unexpected throttler implementation")
}
return throttlerImpl.log(), nil
return t.Log(), nil
}
20 changes: 10 additions & 10 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec

m.memory.ageBadRate(now)

r := result{
r := Result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
Expand Down Expand Up @@ -446,7 +446,7 @@ func stateGreater(a, b state) bool {
// and we should not skip the current replica ("lagRecordNow").
// Even if it's the same replica we may skip it and return false because
// we want to wait longer for the propagation of the current rate change.
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
if m.replicaUnderTest == nil {
return true
}
Expand All @@ -472,7 +472,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)

oldRate := m.rate.Load()
Expand Down Expand Up @@ -560,7 +560,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
return minDuration
}

func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
Expand Down Expand Up @@ -631,7 +631,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
// guessReplicationRate guesses the actual replication rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgReplicationRate is the average rate (per second) at which the replica
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
Expand Down Expand Up @@ -676,14 +676,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
return int64(newRate), reason
}

func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)

decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
}

func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
oldRate := m.rate.Load()
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
if rate == 0 {
Expand All @@ -695,7 +695,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
}

func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
oldRate := m.rate.Load()

m.currentState = newState
Expand Down Expand Up @@ -723,7 +723,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int

// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
Expand Down Expand Up @@ -797,6 +797,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
}
}

func (m *MaxReplicationLagModule) log() []result {
func (m *MaxReplicationLagModule) log() []Result {
return m.results.latestValues()
}
24 changes: 12 additions & 12 deletions go/vt/throttler/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}}
lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}}
reason: {{.Reason}}`))

// result is generated by the MaxReplicationLag module for each processed
// Result is generated by the MaxReplicationLag module for each processed
// "replicationLagRecord".
// It captures the details and the decision of the processing.
type result struct {
type Result struct {
Now time.Time
RateChange rateChange
lastRateChange time.Time
Expand All @@ -80,33 +80,33 @@ type result struct {
GuessedReplicationBacklogNew int
}

func (r result) String() string {
func (r Result) String() string {
var b strings.Builder
if err := resultStringTemplate.Execute(&b, r); err != nil {
panic(fmt.Sprintf("failed to Execute() template: %v", err))
}
return b.String()
}

func (r result) Alias() string {
func (r Result) Alias() string {
return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias)
}

func (r result) TimeSinceLastRateChange() string {
func (r Result) TimeSinceLastRateChange() string {
if r.lastRateChange.IsZero() {
return "n/a"
}
return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds())
}

func (r result) LagBefore() string {
func (r Result) LagBefore() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds)
}

func (r result) AgeOfBeforeLag() string {
func (r Result) AgeOfBeforeLag() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
Expand All @@ -123,18 +123,18 @@ type resultRing struct {
// started reusing entries.
wrapped bool
// values is the underlying ring buffer.
values []result
values []Result
}

// newResultRing creates a new resultRing.
func newResultRing(capacity int) *resultRing {
return &resultRing{
values: make([]result, capacity),
values: make([]Result, capacity),
}
}

// add inserts a new result into the ring buffer.
func (rr *resultRing) add(r result) {
func (rr *resultRing) add(r Result) {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) {

// latestValues returns all values of the buffer. Entries are sorted in reverse
// chronological order i.e. newer items come first.
func (rr *resultRing) latestValues() []result {
func (rr *resultRing) latestValues() []Result {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result {
count = rr.position
}

results := make([]result, count)
results := make([]Result, count)
for i := 0; i < count; i++ {
pos := start - i
if pos < 0 {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
resultIncreased = result{
resultIncreased = Result{
Now: sinceZero(1234 * time.Millisecond),
RateChange: increasedRate,
lastRateChange: sinceZero(1 * time.Millisecond),
Expand All @@ -46,7 +46,7 @@ var (
GuessedReplicationBacklogOld: 0,
GuessedReplicationBacklogNew: 0,
}
resultDecreased = result{
resultDecreased = Result{
Now: sinceZero(5000 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(1234 * time.Millisecond),
Expand All @@ -68,7 +68,7 @@ var (
GuessedReplicationBacklogOld: 10,
GuessedReplicationBacklogNew: 20,
}
resultEmergency = result{
resultEmergency = Result{
Now: sinceZero(10123 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(5000 * time.Millisecond),
Expand All @@ -94,7 +94,7 @@ var (

func TestResultString(t *testing.T) {
testcases := []struct {
r result
r Result
want string
}{
{
Expand Down Expand Up @@ -134,24 +134,24 @@ reason: emergency state decreased the rate`,

func TestResultRing(t *testing.T) {
// Test data.
r1 := result{Reason: "r1"}
r2 := result{Reason: "r2"}
r3 := result{Reason: "r3"}
r1 := Result{Reason: "r1"}
r2 := Result{Reason: "r2"}
r3 := Result{Reason: "r3"}

rr := newResultRing(2)

// Use the ring partially.
rr.add(r1)
got, want := rr.latestValues(), []result{r1}
got, want := rr.latestValues(), []Result{r1}
require.Equal(t, want, got, "items not correctly added to resultRing")

// Use it fully.
rr.add(r2)
got, want = rr.latestValues(), []result{r2, r1}
got, want = rr.latestValues(), []Result{r2, r1}
require.Equal(t, want, got, "items not correctly added to resultRing")

// Let it wrap.
rr.add(r3)
got, want = rr.latestValues(), []result{r3, r2}
got, want = rr.latestValues(), []Result{r3, r2}
require.Equal(t, want, got, "resultRing did not wrap correctly")
}
5 changes: 3 additions & 2 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Throttler interface {
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
Log() []Result
}

// ThrottlerImpl implements a client-side, thread-aware throttler.
Expand Down Expand Up @@ -366,7 +367,7 @@ func (t *ThrottlerImpl) ResetConfiguration() {
t.maxReplicationLagModule.resetConfiguration()
}

// log returns the most recent changes of the MaxReplicationLag module.
func (t *ThrottlerImpl) log() []result {
// Log returns the most recent changes of the MaxReplicationLag module.
func (t *ThrottlerImpl) Log() []Result {
return t.maxReplicationLagModule.log()
}
2 changes: 1 addition & 1 deletion go/vt/throttler/throttlerlogz.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func showThrottlerLog(w http.ResponseWriter, m *managerImpl, name string) {
colorLevel = "high"
}
data := struct {
result
Result
ColorLevel string
}{r, colorLevel}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/throttlerlogz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestThrottlerlogzHandler(t *testing.T) {

testcases := []struct {
desc string
r result
r Result
want string
}{
{
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5b2294f

Please sign in to comment.