Skip to content

Commit

Permalink
Fix race in replicationLagModule of go/vt/throttle
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Jun 11, 2024
1 parent 2531cd0 commit f53e089
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 15 deletions.
40 changes: 40 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package throttler

import (
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -30,6 +31,8 @@ type replicationLagCache struct {
// The map key is replicationLagRecord.LegacyTabletStats.Key.
entries map[string]*replicationLagHistory

mu sync.Mutex

// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
// This map will always be recomputed by sortByLag() and must not be modified
Expand Down Expand Up @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
defer c.mu.Unlock()

if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
Expand All @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) {
entry.add(r)
}

// maxLag returns the maximum replication lag for the entries in cache.
func (c *replicationLagCache) maxLag() (maxLag uint32) {
c.mu.Lock()
defer c.mu.Unlock()

for key := range c.entries {
if c.isIgnored(key) {
continue
}

entry, ok := c.entries[key]
if !ok {
continue
}

latest := entry.latest()
if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord {
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag
// sortByLag sorts all replicas by their latest replication lag value and
// tablet uid and updates the c.slowReplicas set.
func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) {
c.mu.Lock()
defer c.mu.Unlock()

// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)

Expand Down Expand Up @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool {
// this slow replica.
// "key" refers to ReplicationLagRecord.LegacyTabletStats.Key.
func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.slowReplicas) == 0 {
// No slow replicas at all.
return false
Expand Down
7 changes: 7 additions & 0 deletions go/vt/throttler/replication_lag_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) {

require.True(t, c.slowReplicas[r1Key], "r1 should be tracked as a slow replica")
}

func TestReplicationLagCache_MaxLag(t *testing.T) {
c := newReplicationLagCache(2)
c.add(lagRecord(sinceZero(1*time.Second), r1, 30))
c.add(lagRecord(sinceZero(1*time.Second), r2, 1))
require.Equal(t, uint32(30), c.maxLag())
}
18 changes: 3 additions & 15 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
if cache == nil {
return 0
}

return maxLag
return cache.maxLag()
}

// ThreadFinished marks threadID as finished and redistributes the thread's
Expand Down
79 changes: 79 additions & 0 deletions go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ limitations under the License.
package throttler

import (
"context"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// The main purpose of the benchmarks below is to demonstrate the functionality
Expand Down Expand Up @@ -398,3 +404,76 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) {
}()
throttler.ThreadFinished(0)
}

func TestThrottlerMaxLag(t *testing.T) {
fc := &fakeClock{}
throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now)
require.NoError(t, err)
defer throttler.Close()

require.NotNil(t, throttler)
require.NotNil(t, throttler.maxReplicationLagModule)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

// run .add() and .MaxLag() concurrently to detect races
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
} {
wg.Add(1)
go func(wg *sync.WaitGroup, ctx context.Context, t *Throttler, tabletType topodata.TabletType) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
throttler.MaxLag(tabletType)
}
}
}(&wg, ctx, throttler, tabletType)

wg.Add(1)
go func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
require.NotNil(t, cache)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
Type: tabletType,
PortMap: map[string]int32{
"test": 15999,
},
},
},
})
}
}
}(&wg, ctx, throttler, tabletType)
}
time.Sleep(time.Second)
cancel()
wg.Wait()

// check .MaxLag()
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
} {
require.Equal(t, uint32(5), throttler.MaxLag(tabletType))
}
}

0 comments on commit f53e089

Please sign in to comment.