From f53e08963aa59d375f7b807c90c64ce03d84294a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 11 Jun 2024 11:34:48 +0200 Subject: [PATCH 1/5] Fix race in `replicationLagModule` of `go/vt/throttle` Signed-off-by: Tim Vaillancourt --- go/vt/throttler/replication_lag_cache.go | 40 ++++++++++ go/vt/throttler/replication_lag_cache_test.go | 7 ++ go/vt/throttler/throttler.go | 18 +---- go/vt/throttler/throttler_test.go | 79 +++++++++++++++++++ 4 files changed, 129 insertions(+), 15 deletions(-) diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c9c2e94f113..ab26c0bc6b8 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -18,6 +18,7 @@ package throttler import ( "sort" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -30,6 +31,8 @@ type replicationLagCache struct { // The map key is replicationLagRecord.LegacyTabletStats.Key. entries map[string]*replicationLagHistory + mu sync.Mutex + // slowReplicas is a set of slow replicas. // The map key is replicationLagRecord.LegacyTabletStats.Key. // This map will always be recomputed by sortByLag() and must not be modified @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { + c.mu.Lock() + defer c.mu.Unlock() + if !r.Serving { // Tablet is down. Do no longer track it. delete(c.entries, discovery.TabletToMapKey(r.Tablet)) @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry, ok := c.entries[key] + if !ok { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { // or just after it. // If there is no such record, a zero record is returned. func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag // sortByLag sorts all replicas by their latest replication lag value and // tablet uid and updates the c.slowReplicas set. func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { + c.mu.Lock() + defer c.mu.Unlock() + // Reset the current list of ignored replicas. c.slowReplicas = make(map[string]bool) @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/replication_lag_cache_test.go b/go/vt/throttler/replication_lag_cache_test.go index 135c0f03956..a81c800ebbe 100644 --- a/go/vt/throttler/replication_lag_cache_test.go +++ b/go/vt/throttler/replication_lag_cache_test.go @@ -84,3 +84,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) { require.True(t, c.slowReplicas[r1Key], "r1 should be tracked as a slow replica") } + +func TestReplicationLagCache_MaxLag(t *testing.T) { + c := newReplicationLagCache(2) + c.add(lagRecord(sinceZero(1*time.Second), r1, 30)) + c.add(lagRecord(sinceZero(1*time.Second), r2, 1)) + require.Equal(t, uint32(30), c.maxLag()) +} diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 909888bd0d4..19b95559fed 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -229,22 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // the provided type, excluding ignored tablets. func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index b33bb2ca255..1a64c149cd4 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -17,11 +17,17 @@ limitations under the License. package throttler import ( + "context" "runtime" + "sync" "testing" "time" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" ) // The main purpose of the benchmarks below is to demonstrate the functionality @@ -398,3 +404,76 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { }() throttler.ThreadFinished(0) } + +func TestThrottlerMaxLag(t *testing.T) { + fc := &fakeClock{} + throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + require.NoError(t, err) + defer throttler.Close() + + require.NotNil(t, throttler) + require.NotNil(t, throttler.maxReplicationLagModule) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // run .add() and .MaxLag() concurrently to detect races + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, t *Throttler, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + throttler.MaxLag(tabletType) + } + } + }(&wg, ctx, throttler, tabletType) + + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) + require.NotNil(t, cache) + cache.add(replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{ + Serving: true, + Stats: &query.RealtimeStats{ + ReplicationLagSeconds: 5, + }, + Tablet: &topodata.Tablet{ + Hostname: t.Name(), + Type: tabletType, + PortMap: map[string]int32{ + "test": 15999, + }, + }, + }, + }) + } + } + }(&wg, ctx, throttler, tabletType) + } + time.Sleep(time.Second) + cancel() + wg.Wait() + + // check .MaxLag() + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) + } +} From 49e2dc7454f185f36ce7bfb08a84364b091ba9f8 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 2 Oct 2024 19:31:27 +0200 Subject: [PATCH 2/5] PR suggestions Signed-off-by: Tim Vaillancourt --- go/vt/throttler/throttler_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index 91743ff38d6..e3c7da3cdfe 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -419,6 +419,8 @@ func TestThrottlerMaxLag(t *testing.T) { require.NotNil(t, throttler.maxReplicationLagModule) ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var wg sync.WaitGroup // run .add() and .MaxLag() concurrently to detect races @@ -427,7 +429,7 @@ func TestThrottlerMaxLag(t *testing.T) { topodata.TabletType_RDONLY, } { wg.Add(1) - go func(wg *sync.WaitGroup, ctx context.Context, t *Throttler, tabletType topodata.TabletType) { + go func() { defer wg.Done() for { select { @@ -437,7 +439,7 @@ func TestThrottlerMaxLag(t *testing.T) { throttler.MaxLag(tabletType) } } - }(&wg, ctx, throttler, tabletType) + }() wg.Add(1) go func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) { From e556e2fbeb8a37608b2aacc5677863b0faa462c9 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 2 Oct 2024 21:23:29 +0200 Subject: [PATCH 3/5] fix test rebasing broke Signed-off-by: Tim Vaillancourt --- go/vt/throttler/throttler_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index e3c7da3cdfe..c6374707e2a 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -411,8 +411,9 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { func TestThrottlerMaxLag(t *testing.T) { fc := &fakeClock{} - throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + th, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) require.NoError(t, err) + throttler := th.(*ThrottlerImpl) defer throttler.Close() require.NotNil(t, throttler) @@ -442,7 +443,7 @@ func TestThrottlerMaxLag(t *testing.T) { }() wg.Add(1) - go func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) { + go func(wg *sync.WaitGroup, ctx context.Context, throttler *ThrottlerImpl, tabletType topodata.TabletType) { defer wg.Done() for { select { From df59fcf058ba36e4a326110e3f68aed10dc72a83 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 4 Oct 2024 18:09:24 +0200 Subject: [PATCH 4/5] Update go/vt/throttler/replication_lag_cache.go Co-authored-by: Matt Lord Signed-off-by: Tim Vaillancourt --- go/vt/throttler/replication_lag_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index ab26c0bc6b8..d47b82474ac 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -92,8 +92,8 @@ func (c *replicationLagCache) maxLag() (maxLag uint32) { continue } - entry, ok := c.entries[key] - if !ok { + entry := c.entries[key] + if entry == nil { continue } From 1da308b25e22fbf05f4b9735fb99de80979e7c9b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 4 Oct 2024 18:13:48 +0200 Subject: [PATCH 5/5] PR suggestions Signed-off-by: Tim Vaillancourt --- go/vt/throttler/throttler_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index c6374707e2a..d321207b8ca 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -30,6 +30,12 @@ import ( "vitess.io/vitess/go/vt/proto/topodata" ) +// testTabletTypes is the list of tablet types to test. +var testTabletTypes = []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, +} + // The main purpose of the benchmarks below is to demonstrate the functionality // of the throttler in the real-world (using a non-faked time.Now). // The benchmark values should be as close as possible to the request interval @@ -425,10 +431,7 @@ func TestThrottlerMaxLag(t *testing.T) { var wg sync.WaitGroup // run .add() and .MaxLag() concurrently to detect races - for _, tabletType := range []topodata.TabletType{ - topodata.TabletType_REPLICA, - topodata.TabletType_RDONLY, - } { + for _, tabletType := range testTabletTypes { wg.Add(1) go func() { defer wg.Done() @@ -443,7 +446,7 @@ func TestThrottlerMaxLag(t *testing.T) { }() wg.Add(1) - go func(wg *sync.WaitGroup, ctx context.Context, throttler *ThrottlerImpl, tabletType topodata.TabletType) { + go func() { defer wg.Done() for { select { @@ -470,17 +473,14 @@ func TestThrottlerMaxLag(t *testing.T) { }) } } - }(&wg, ctx, throttler, tabletType) + }() } time.Sleep(time.Second) cancel() wg.Wait() // check .MaxLag() - for _, tabletType := range []topodata.TabletType{ - topodata.TabletType_REPLICA, - topodata.TabletType_RDONLY, - } { + for _, tabletType := range testTabletTypes { require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) } }