From 2a0ffd49fc2c082a58918dc51d1ef25d2290b612 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 03:07:26 +0100 Subject: [PATCH 01/14] `vtorc`: simplify discovery queue Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 83 ++++++++++++----------------- go/vt/vtorc/discovery/queue_test.go | 43 +++++++++++++++ go/vt/vtorc/logic/vtorc.go | 3 +- 3 files changed, 77 insertions(+), 52 deletions(-) create mode 100644 go/vt/vtorc/discovery/queue_test.go diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index bf279b781f2..ac1aaadfcfb 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -33,33 +33,35 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" ) -// Queue contains information for managing discovery requests +// queueItem represents an item in the discovery.Queue. +type queueItem struct { + CreatedAt time.Time + Key string +} + +// Queue is an implementation of discovery.Queue. type Queue struct { sync.Mutex - - name string - done chan struct{} - queue chan string - queuedKeys map[string]time.Time - consumedKeys map[string]time.Time + enqueued map[string]struct{} + nowFunc func() time.Time + queue chan queueItem } -// CreateQueue allows for creation of a new discovery queue -func CreateQueue(name string) *Queue { +// NewQueue creates a new queue. +func NewQueue() *Queue { return &Queue{ - name: name, - queuedKeys: make(map[string]time.Time), - consumedKeys: make(map[string]time.Time), - queue: make(chan string, config.DiscoveryQueueCapacity), + enqueued: make(map[string]struct{}), + nowFunc: func() time.Time { return time.Now() }, + queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } -// QueueLen returns the length of the queue (channel size + queued size) +// QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { q.Lock() defer q.Unlock() - return len(q.queue) + len(q.queuedKeys) + return len(q.queue) + len(q.enqueued) } // Push enqueues a key if it is not on a queue and is not being @@ -68,50 +70,31 @@ func (q *Queue) Push(key string) { q.Lock() defer q.Unlock() - // is it enqueued already? - if _, found := q.queuedKeys[key]; found { + if _, found := q.enqueued[key]; found { return } - - // is it being processed now? - if _, found := q.consumedKeys[key]; found { - return + q.enqueued[key] = struct{}{} + q.queue <- queueItem{ + CreatedAt: q.nowFunc(), + Key: key, } - - q.queuedKeys[key] = time.Now() - q.queue <- key } // Consume fetches a key to process; blocks if queue is empty. // Release must be called once after Consume. func (q *Queue) Consume() string { - q.Lock() - queue := q.queue - q.Unlock() - - key := <-queue - - q.Lock() - defer q.Unlock() - - // alarm if have been waiting for too long - timeOnQueue := time.Since(q.queuedKeys[key]) + var item queueItem + func() { + q.Lock() + defer q.Unlock() + item = <-q.queue + delete(q.enqueued, item.Key) + }() + + timeOnQueue := time.Since(item.CreatedAt) if timeOnQueue > config.GetInstancePollTime() { - log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds()) + log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", item.Key, timeOnQueue.Seconds()) } - q.consumedKeys[key] = q.queuedKeys[key] - - delete(q.queuedKeys, key) - - return key -} - -// Release removes a key from a list of being processed keys -// which allows that key to be pushed into the queue again. -func (q *Queue) Release(key string) { - q.Lock() - defer q.Unlock() - - delete(q.consumedKeys, key) + return item.Key } diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go new file mode 100644 index 00000000000..48daa2b986c --- /dev/null +++ b/go/vt/vtorc/discovery/queue_test.go @@ -0,0 +1,43 @@ +package discovery + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + now := time.Now() + q := NewQueue() + q.nowFunc = func() time.Time { return now } + require.Zero(t, q.QueueLen()) + + // Push + q.Push(t.Name()) + require.Equal(t, 2, q.QueueLen()) + + // Consume + require.Equal(t, t.Name(), q.Consume()) + require.Zero(t, q.QueueLen()) +} + +func BenchmarkQueues(b *testing.B) { + tests := []struct { + name string + q Queue + }{ + {"LegacyQueue", CreateQueue("test")}, + {"QueueV2", NewQueue()}, + } + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + test.q.Push(strconv.Itoa(i)) + test.q.QueueLen() + test.q.Release(test.q.Consume()) + } + }) + } +} diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 5ac5af50d47..c3b7cd09275 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -108,14 +108,13 @@ func waitForLocksRelease() { // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { - discoveryQueue = discovery.CreateQueue("DEFAULT") + discoveryQueue = discovery.NewQueue() // create a pool of discovery workers for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { go func() { for { tabletAlias := discoveryQueue.Consume() DiscoverInstance(tabletAlias, false /* forceDiscovery */) - discoveryQueue.Release(tabletAlias) } }() } From 1b5fc7912902bf4a8b0d90760b2812ca20a8d601 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 03:10:06 +0100 Subject: [PATCH 02/14] header Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 48daa2b986c..001434e1257 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package discovery import ( From 727024bb11e4453a7dc96da5b8308e944ce9a89f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 03:13:56 +0100 Subject: [PATCH 03/14] cleanup Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 4 +--- go/vt/vtorc/discovery/queue_test.go | 23 ----------------------- 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index ac1aaadfcfb..5e1354cbc10 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -43,7 +43,6 @@ type queueItem struct { type Queue struct { sync.Mutex enqueued map[string]struct{} - nowFunc func() time.Time queue chan queueItem } @@ -51,7 +50,6 @@ type Queue struct { func NewQueue() *Queue { return &Queue{ enqueued: make(map[string]struct{}), - nowFunc: func() time.Time { return time.Now() }, queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } @@ -75,7 +73,7 @@ func (q *Queue) Push(key string) { } q.enqueued[key] = struct{}{} q.queue <- queueItem{ - CreatedAt: q.nowFunc(), + CreatedAt: time.Now(), Key: key, } } diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 001434e1257..2823f1bc28c 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -17,17 +17,13 @@ limitations under the License. package discovery import ( - "strconv" "testing" - "time" "github.com/stretchr/testify/require" ) func TestQueue(t *testing.T) { - now := time.Now() q := NewQueue() - q.nowFunc = func() time.Time { return now } require.Zero(t, q.QueueLen()) // Push @@ -38,22 +34,3 @@ func TestQueue(t *testing.T) { require.Equal(t, t.Name(), q.Consume()) require.Zero(t, q.QueueLen()) } - -func BenchmarkQueues(b *testing.B) { - tests := []struct { - name string - q Queue - }{ - {"LegacyQueue", CreateQueue("test")}, - {"QueueV2", NewQueue()}, - } - for _, test := range tests { - b.Run(test.name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - test.q.Push(strconv.Itoa(i)) - test.q.QueueLen() - test.q.Release(test.q.Consume()) - } - }) - } -} From b5631c9b5491b9dc85f224f7b5ef8d2761920af6 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 03:48:19 +0100 Subject: [PATCH 04/14] more tests Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 2823f1bc28c..8a86c8f902c 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -29,8 +29,12 @@ func TestQueue(t *testing.T) { // Push q.Push(t.Name()) require.Equal(t, 2, q.QueueLen()) + _, found := q.enqueued[t.Name()] + require.True(t, found) // Consume require.Equal(t, t.Name(), q.Consume()) require.Zero(t, q.QueueLen()) + _, found = q.enqueued[t.Name()] + require.False(t, found) } From eacf71d1c1d0847371533f94dc69887ab59952bc Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 13:41:59 +0100 Subject: [PATCH 05/14] `vtorc`: lockless discover queue, add concurrency flag Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/config/config.go | 17 +++++++++++++++- go/vt/vtorc/discovery/queue.go | 30 +++++------------------------ go/vt/vtorc/discovery/queue_test.go | 6 +----- go/vt/vtorc/logic/vtorc.go | 2 +- 4 files changed, 23 insertions(+), 32 deletions(-) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index db367673aeb..209e5c246a2 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -32,7 +32,6 @@ const ( AuditPageSize = 20 DebugMetricsIntervalSeconds = 10 StaleInstanceCoordinatesExpireSeconds = 60 - DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery DiscoveryQueueCapacity = 100000 DiscoveryQueueMaxStatisticsSize = 120 DiscoveryCollectionRetentionSeconds = 120 @@ -58,6 +57,15 @@ var ( }, ) + discoveryMaxConcurrency = viperutil.Configure( + "discovery-max-concurrency", + viperutil.Options[int]{ + FlagName: "discovery-max-concurrency", + Default: 300, + Dynamic: true, + }, + ) + sqliteDataFile = viperutil.Configure( "sqlite-data-file", viperutil.Options[string]{ @@ -191,6 +199,7 @@ func init() { // registerFlags registers the flags required by VTOrc func registerFlags(fs *pflag.FlagSet) { + fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of goroutines doing tablet discovery") fs.String("sqlite-data-file", sqliteDataFile.Default(), "SQLite Datafile to use as VTOrc's database") fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information") fs.Duration("snapshot-topology-interval", snapshotTopologyInterval.Default(), "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") @@ -211,6 +220,7 @@ func registerFlags(fs *pflag.FlagSet) { viperutil.BindFlags(fs, instancePollTime, preventCrossCellFailover, + discoveryMaxConcurrency, sqliteDataFile, snapshotTopologyInterval, reasonableReplicationLag, @@ -248,6 +258,11 @@ func GetPreventCrossCellFailover() bool { return preventCrossCellFailover.Get() } +// GetDiscoveryMaxConcurrency is a getter function. +func GetDiscoveryMaxConcurrency() uint { + return uint(discoveryMaxConcurrency.Get()) +} + // GetSQLiteDataFile is a getter function. func GetSQLiteDataFile() string { return sqliteDataFile.Get() diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 5e1354cbc10..049a477806b 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -17,7 +17,7 @@ /* package discovery manages a queue of discovery requests: an ordered -queue with no duplicates. +queue. push() operation never blocks while pop() blocks on an empty queue. @@ -26,7 +26,6 @@ push() operation never blocks while pop() blocks on an empty queue. package discovery import ( - "sync" "time" "vitess.io/vitess/go/vt/log" @@ -41,37 +40,24 @@ type queueItem struct { // Queue is an implementation of discovery.Queue. type Queue struct { - sync.Mutex - enqueued map[string]struct{} - queue chan queueItem + queue chan queueItem } // NewQueue creates a new queue. func NewQueue() *Queue { return &Queue{ - enqueued: make(map[string]struct{}), - queue: make(chan queueItem, config.DiscoveryQueueCapacity), + queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } // QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { - q.Lock() - defer q.Unlock() - - return len(q.queue) + len(q.enqueued) + return len(q.queue) } // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { - q.Lock() - defer q.Unlock() - - if _, found := q.enqueued[key]; found { - return - } - q.enqueued[key] = struct{}{} q.queue <- queueItem{ CreatedAt: time.Now(), Key: key, @@ -81,13 +67,7 @@ func (q *Queue) Push(key string) { // Consume fetches a key to process; blocks if queue is empty. // Release must be called once after Consume. func (q *Queue) Consume() string { - var item queueItem - func() { - q.Lock() - defer q.Unlock() - item = <-q.queue - delete(q.enqueued, item.Key) - }() + item := <-q.queue timeOnQueue := time.Since(item.CreatedAt) if timeOnQueue > config.GetInstancePollTime() { diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 8a86c8f902c..0efbd047553 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -28,13 +28,9 @@ func TestQueue(t *testing.T) { // Push q.Push(t.Name()) - require.Equal(t, 2, q.QueueLen()) - _, found := q.enqueued[t.Name()] - require.True(t, found) + require.Equal(t, 1, q.QueueLen()) // Consume require.Equal(t, t.Name(), q.Consume()) require.Zero(t, q.QueueLen()) - _, found = q.enqueued[t.Name()] - require.False(t, found) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index c3b7cd09275..fe03edffe1c 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -110,7 +110,7 @@ func waitForLocksRelease() { func handleDiscoveryRequests() { discoveryQueue = discovery.NewQueue() // create a pool of discovery workers - for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ { + for i := uint(0); i < config.GetDiscoveryMaxConcurrency(); i++ { go func() { for { tabletAlias := discoveryQueue.Consume() From bffb1f856425173ed9bc0fc2cfc5698cf1df8881 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 13:46:23 +0100 Subject: [PATCH 06/14] update flags e2e Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtorc.txt | 1 + go/vt/vtorc/config/config.go | 2 +- go/vt/vtorc/discovery/queue.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 57eb907cf4d..e5b493af45c 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -32,6 +32,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. + --discovery-max-concurrency int Number of workers for used for tablet discovery (default 300) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index 209e5c246a2..91f2d9bfa76 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -199,7 +199,7 @@ func init() { // registerFlags registers the flags required by VTOrc func registerFlags(fs *pflag.FlagSet) { - fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of goroutines doing tablet discovery") + fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of workers for used for tablet discovery") fs.String("sqlite-data-file", sqliteDataFile.Default(), "SQLite Datafile to use as VTOrc's database") fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information") fs.Duration("snapshot-topology-interval", snapshotTopologyInterval.Default(), "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 049a477806b..0b9eeff3cfd 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -71,7 +71,7 @@ func (q *Queue) Consume() string { timeOnQueue := time.Since(item.CreatedAt) if timeOnQueue > config.GetInstancePollTime() { - log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", item.Key, timeOnQueue.Seconds()) + log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) } return item.Key From 49edc9eb4c127fbecc9c20ec98930fe3ff678365 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 13:48:01 +0100 Subject: [PATCH 07/14] fix typo Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index 91f2d9bfa76..ab835527664 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -199,7 +199,7 @@ func init() { // registerFlags registers the flags required by VTOrc func registerFlags(fs *pflag.FlagSet) { - fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of workers for used for tablet discovery") + fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of workers used for tablet discovery") fs.String("sqlite-data-file", sqliteDataFile.Default(), "SQLite Datafile to use as VTOrc's database") fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information") fs.Duration("snapshot-topology-interval", snapshotTopologyInterval.Default(), "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours") From dab125d6a2aac8978ac30348d903deacc213136e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 13:48:06 +0100 Subject: [PATCH 08/14] fix typo Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtorc.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index e5b493af45c..f7270a75170 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -32,7 +32,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. - --discovery-max-concurrency int Number of workers for used for tablet discovery (default 300) + --discovery-max-concurrency int Number of workers used for tablet discovery (default 300) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) From a44370624c5eb30f3383aa3199b5ffb9b8e74d30 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 13:54:42 +0100 Subject: [PATCH 09/14] rename var Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 0b9eeff3cfd..be6c4865013 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -34,8 +34,8 @@ import ( // queueItem represents an item in the discovery.Queue. type queueItem struct { - CreatedAt time.Time - Key string + PushedAt time.Time + Key string } // Queue is an implementation of discovery.Queue. @@ -59,8 +59,8 @@ func (q *Queue) QueueLen() int { // processed; silently returns otherwise. func (q *Queue) Push(key string) { q.queue <- queueItem{ - CreatedAt: time.Now(), - Key: key, + PushedAt: time.Now(), + Key: key, } } @@ -69,7 +69,7 @@ func (q *Queue) Push(key string) { func (q *Queue) Consume() string { item := <-q.queue - timeOnQueue := time.Since(item.CreatedAt) + timeOnQueue := time.Since(item.PushedAt) if timeOnQueue > config.GetInstancePollTime() { log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) } From faff7e68b3aec92197f2b2ec6fb8900c611bf105 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 14:15:25 +0100 Subject: [PATCH 10/14] restore dupe detection Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 14 +++++++++++++- go/vt/vtorc/discovery/queue_test.go | 14 ++++++++++++++ go/vt/vtorc/logic/vtorc.go | 1 + 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index be6c4865013..92799410a4b 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -26,6 +26,7 @@ push() operation never blocks while pop() blocks on an empty queue. package discovery import ( + "sync" "time" "vitess.io/vitess/go/vt/log" @@ -40,7 +41,8 @@ type queueItem struct { // Queue is an implementation of discovery.Queue. type Queue struct { - queue chan queueItem + enqueued sync.Map + queue chan queueItem } // NewQueue creates a new queue. @@ -58,10 +60,14 @@ func (q *Queue) QueueLen() int { // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { + if _, found := q.enqueued.Load(key); found { + return + } q.queue <- queueItem{ PushedAt: time.Now(), Key: key, } + q.enqueued.Store(key, struct{}{}) } // Consume fetches a key to process; blocks if queue is empty. @@ -76,3 +82,9 @@ func (q *Queue) Consume() string { return item.Key } + +// Release removes a key from a list of being processed keys +// which allows that key to be pushed into the queue again. +func (q *Queue) Release(key string) { + q.enqueued.Delete(key) +} diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 0efbd047553..3eacf6fb365 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -29,8 +29,22 @@ func TestQueue(t *testing.T) { // Push q.Push(t.Name()) require.Equal(t, 1, q.QueueLen()) + _, found := q.enqueued.Load(t.Name()) + require.True(t, found) + + // Push duplicate + q.Push(t.Name()) + require.Equal(t, 1, q.QueueLen()) // Consume require.Equal(t, t.Name(), q.Consume()) require.Zero(t, q.QueueLen()) + _, found = q.enqueued.Load(t.Name()) + require.True(t, found) + + // Release + q.Release(t.Name()) + require.Zero(t, q.QueueLen()) + _, found = q.enqueued.Load(t.Name()) + require.False(t, found) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index fe03edffe1c..79c22515e59 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -115,6 +115,7 @@ func handleDiscoveryRequests() { for { tabletAlias := discoveryQueue.Consume() DiscoverInstance(tabletAlias, false /* forceDiscovery */) + discoveryQueue.Release(tabletAlias) } }() } From 56e14d9acce82f63e8b63864c1c4536d7f2067aa Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 19 Feb 2025 14:19:48 +0100 Subject: [PATCH 11/14] fix comment Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index 92799410a4b..a682bf1290f 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -17,7 +17,7 @@ /* package discovery manages a queue of discovery requests: an ordered -queue. +queue with no duplicates. push() operation never blocks while pop() blocks on an empty queue. From 8b9bbb1df2385d57a03068270512affc196f1524 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 20 Feb 2025 13:45:15 +0100 Subject: [PATCH 12/14] tweaks Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 28 ++++++++++++++++++---- go/vt/vtorc/discovery/queue_test.go | 36 ++++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index a682bf1290f..b6a15622613 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -41,17 +41,33 @@ type queueItem struct { // Queue is an implementation of discovery.Queue. type Queue struct { - enqueued sync.Map + mu sync.Mutex + enqueued map[string]struct{} queue chan queueItem } // NewQueue creates a new queue. func NewQueue() *Queue { return &Queue{ - queue: make(chan queueItem, config.DiscoveryQueueCapacity), + enqueued: make(map[string]struct{}), + queue: make(chan queueItem, config.DiscoveryQueueCapacity), } } +// setKeyCheckEnqueued returns true if a key is already enqueued, if +// not the key will be marked as enqueued and false is returned. +func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { + q.mu.Lock() + defer q.mu.Unlock() + + if _, found := q.enqueued[key]; found { + alreadyEnqueued = true + } else { + q.enqueued[key] = struct{}{} + } + return alreadyEnqueued +} + // QueueLen returns the length of the queue. func (q *Queue) QueueLen() int { return len(q.queue) @@ -60,14 +76,13 @@ func (q *Queue) QueueLen() int { // Push enqueues a key if it is not on a queue and is not being // processed; silently returns otherwise. func (q *Queue) Push(key string) { - if _, found := q.enqueued.Load(key); found { + if q.setKeyCheckEnqueued(key) { return } q.queue <- queueItem{ PushedAt: time.Now(), Key: key, } - q.enqueued.Store(key, struct{}{}) } // Consume fetches a key to process; blocks if queue is empty. @@ -86,5 +101,8 @@ func (q *Queue) Consume() string { // Release removes a key from a list of being processed keys // which allows that key to be pushed into the queue again. func (q *Queue) Release(key string) { - q.enqueued.Delete(key) + q.mu.Lock() + defer q.mu.Unlock() + + delete(q.enqueued, key) } diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 3eacf6fb365..3936a72718c 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package discovery import ( + "strconv" "testing" "github.com/stretchr/testify/require" @@ -29,7 +30,7 @@ func TestQueue(t *testing.T) { // Push q.Push(t.Name()) require.Equal(t, 1, q.QueueLen()) - _, found := q.enqueued.Load(t.Name()) + _, found := q.enqueued[t.Name()] require.True(t, found) // Push duplicate @@ -39,12 +40,41 @@ func TestQueue(t *testing.T) { // Consume require.Equal(t, t.Name(), q.Consume()) require.Zero(t, q.QueueLen()) - _, found = q.enqueued.Load(t.Name()) + _, found = q.enqueued[t.Name()] require.True(t, found) // Release q.Release(t.Name()) require.Zero(t, q.QueueLen()) - _, found = q.enqueued.Load(t.Name()) + _, found = q.enqueued[t.Name()] require.False(t, found) } + +type testQueue interface { + QueueLen() int + Push(string) + Consume() string + Release(string) +} + +func BenchmarkQueues(b *testing.B) { + tests := []struct { + name string + queue testQueue + }{ + {"Current", NewQueue()}, + } + for _, test := range tests { + q := test.queue + b.Run(test.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + for i := 0; i < 1000; i++ { + q.Push(b.Name() + strconv.Itoa(i)) + } + for q.QueueLen() > 0 { + q.Release(q.Consume()) + } + } + }) + } +} From 503bf4211a3e8b1564ad889027ef6b825b6ad1be Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 20 Feb 2025 13:52:20 +0100 Subject: [PATCH 13/14] fix comment Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/discovery/queue.go b/go/vt/vtorc/discovery/queue.go index b6a15622613..42b6516efdf 100644 --- a/go/vt/vtorc/discovery/queue.go +++ b/go/vt/vtorc/discovery/queue.go @@ -39,7 +39,7 @@ type queueItem struct { Key string } -// Queue is an implementation of discovery.Queue. +// Queue is an ordered queue with deduplication. type Queue struct { mu sync.Mutex enqueued map[string]struct{} From 45e87fd28570cf619d4c1d80d318b95286404095 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 20 Feb 2025 14:14:07 +0100 Subject: [PATCH 14/14] make QueueLen usage more realistic Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/discovery/queue_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vtorc/discovery/queue_test.go b/go/vt/vtorc/discovery/queue_test.go index 3936a72718c..f72c5721960 100644 --- a/go/vt/vtorc/discovery/queue_test.go +++ b/go/vt/vtorc/discovery/queue_test.go @@ -71,7 +71,8 @@ func BenchmarkQueues(b *testing.B) { for i := 0; i < 1000; i++ { q.Push(b.Name() + strconv.Itoa(i)) } - for q.QueueLen() > 0 { + q.QueueLen() + for i := 0; i < 1000; i++ { q.Release(q.Consume()) } }