Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-15.0: pre-backport txthrottler crash fixes #480

Merged
merged 10 commits into from
Aug 21, 2024
52 changes: 32 additions & 20 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"html/template"
Expand Down Expand Up @@ -98,6 +99,9 @@ var (

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -296,6 +300,27 @@ type HealthCheckImpl struct {
healthCheckDialSem *semaphore.Weighted
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
return nil, errKeyspacesToWatchAndTabletFilters
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
return filters, nil
}

// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
Expand All @@ -317,10 +342,14 @@ type HealthCheckImpl struct {
//
// The localCell for this healthcheck
//
// callback.
// cellsToWatch.
//
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
// Is a list of cells to watch for tablets.
//
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
Expand All @@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

Expand Down
79 changes: 75 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,77 @@ func init() {
refreshInterval = time.Minute
}

func TestNewVTGateHealthCheckFilters(t *testing.T) {
defer func() {
KeyspacesToWatch = nil
tabletFilters = nil
tabletFilterTags = nil
}()

testCases := []struct {
name string
keyspacesToWatch []string
tabletFilters []string
tabletFilterTags map[string]string
expectedError string
expectedFilterTypes []any
}{
{
name: "noFilters",
},
{
name: "tabletFilters",
tabletFilters: []string{"ks1|-80"},
expectedFilterTypes: []any{&FilterByShard{}},
},
{
name: "keyspacesToWatch",
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}},
},
{
name: "tabletFiltersAndTags",
tabletFilters: []string{"ks1|-80"},
tabletFilterTags: map[string]string{"test": "true"},
expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}},
},
{
name: "keyspacesToWatchAndTags",
tabletFilterTags: map[string]string{"test": "true"},
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}},
},
{
name: "failKeyspacesToWatchAndFilters",
tabletFilters: []string{"ks1|-80"},
keyspacesToWatch: []string{"ks1"},
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
},
{
name: "failInvalidTabletFilters",
tabletFilters: []string{"shouldfail!@#!"},
expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
KeyspacesToWatch = testCase.keyspacesToWatch
tabletFilters = testCase.tabletFilters
tabletFilterTags = testCase.tabletFilterTags

filters, err := NewVTGateHealthCheckFilters()
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
}
assert.Len(t, filters, len(testCase.expectedFilterTypes))
for i, filter := range filters {
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
}
})
}
}

func TestHealthCheck(t *testing.T) {
// reset error counters
hcErrorCounters.ResetAll()
Expand Down Expand Up @@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) {

func TestPrimaryInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as primary in different cell
Expand Down Expand Up @@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) {

func TestReplicaInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as replica
Expand Down Expand Up @@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) {

func TestCellAliases(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Expand Down Expand Up @@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
}

func createTestHc(ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "")
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
}

type fakeConn struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)
kss := &keyspaceState{
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)

Expand Down
6 changes: 3 additions & 3 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type replica struct {

// throttler is used to enforce the maximum rate at which replica applies
// transactions. It must not be confused with the client's throttler.
throttler *throttler.Throttler
throttler throttler.Throttler
lastHealthUpdate time.Time
lagUpdateInterval time.Duration

Expand Down Expand Up @@ -224,7 +224,7 @@ type client struct {
primary *primary

healthCheck discovery.HealthCheck
throttler *throttler.Throttler
throttler throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
Expand All @@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client {
log.Fatal(err)
}

healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "")
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
c := &client{
primary: primary,
healthCheck: healthCheck,
Expand Down
10 changes: 5 additions & 5 deletions go/vt/throttler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ type managerImpl struct {
// mu guards all fields in this group.
mu sync.Mutex
// throttlers tracks all running throttlers (by their name).
throttlers map[string]*Throttler
throttlers map[string]Throttler
}

func newManager() *managerImpl {
return &managerImpl{
throttlers: make(map[string]*Throttler),
throttlers: make(map[string]Throttler),
}
}

func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error {
func (m *managerImpl) registerThrottler(name string, throttler Throttler) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down Expand Up @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {

// log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) log(throttlerName string) ([]result, error) {
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}

return t.log(), nil
return t.Log(), nil
}
2 changes: 1 addition & 1 deletion go/vt/throttler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

type managerTestFixture struct {
m *managerImpl
t1, t2 *Throttler
t1, t2 Throttler
}

func (f *managerTestFixture) setUp() error {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec

m.memory.ageBadRate(now)

r := result{
r := Result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
Expand Down Expand Up @@ -445,7 +445,7 @@ func stateGreater(a, b state) bool {
// and we should not skip the current replica ("lagRecordNow").
// Even if it's the same replica we may skip it and return false because
// we want to wait longer for the propagation of the current rate change.
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
if m.replicaUnderTest == nil {
return true
}
Expand All @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)

oldRate := m.rate.Get()
Expand Down Expand Up @@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
return minDuration
}

func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
Expand Down Expand Up @@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
// guessReplicationRate guesses the actual replication rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgReplicationRate is the average rate (per second) at which the replica
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
Expand Down Expand Up @@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
return int64(newRate), reason
}

func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)

decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
}

func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
oldRate := m.rate.Get()
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
if rate == 0 {
Expand All @@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
}

func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
oldRate := m.rate.Get()

m.currentState = newState
Expand Down Expand Up @@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int

// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
Expand Down Expand Up @@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
}
}

func (m *MaxReplicationLagModule) log() []result {
func (m *MaxReplicationLagModule) log() []Result {
return m.results.latestValues()
}
Loading
Loading