Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vtorc: cleanup discover queue, add concurrency flag #17825

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--discovery-max-concurrency int Number of workers used for tablet discovery (default 300)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const (
AuditPageSize = 20
DebugMetricsIntervalSeconds = 10
StaleInstanceCoordinatesExpireSeconds = 60
DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery
DiscoveryQueueCapacity = 100000
DiscoveryQueueMaxStatisticsSize = 120
DiscoveryCollectionRetentionSeconds = 120
Expand All @@ -58,6 +57,15 @@ var (
},
)

discoveryMaxConcurrency = viperutil.Configure(
"discovery-max-concurrency",
viperutil.Options[int]{
FlagName: "discovery-max-concurrency",
Default: 300,
Dynamic: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably shouldn't be a dynamic variable since we don't change the discovery queue capacity after its been initialized.

},
)

sqliteDataFile = viperutil.Configure(
"sqlite-data-file",
viperutil.Options[string]{
Expand Down Expand Up @@ -191,6 +199,7 @@ func init() {

// registerFlags registers the flags required by VTOrc
func registerFlags(fs *pflag.FlagSet) {
fs.Int("discovery-max-concurrency", discoveryMaxConcurrency.Default(), "Number of workers used for tablet discovery")
fs.String("sqlite-data-file", sqliteDataFile.Default(), "SQLite Datafile to use as VTOrc's database")
fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information")
fs.Duration("snapshot-topology-interval", snapshotTopologyInterval.Default(), "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours")
Expand All @@ -211,6 +220,7 @@ func registerFlags(fs *pflag.FlagSet) {
viperutil.BindFlags(fs,
instancePollTime,
preventCrossCellFailover,
discoveryMaxConcurrency,
sqliteDataFile,
snapshotTopologyInterval,
reasonableReplicationLag,
Expand Down Expand Up @@ -248,6 +258,11 @@ func GetPreventCrossCellFailover() bool {
return preventCrossCellFailover.Get()
}

// GetDiscoveryMaxConcurrency is a getter function.
func GetDiscoveryMaxConcurrency() uint {
return uint(discoveryMaxConcurrency.Get())
}

// GetSQLiteDataFile is a getter function.
func GetSQLiteDataFile() string {
return sqliteDataFile.Get()
Expand Down
91 changes: 41 additions & 50 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,85 +33,76 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
)

// Queue contains information for managing discovery requests
type Queue struct {
sync.Mutex
// queueItem represents an item in the discovery.Queue.
type queueItem struct {
PushedAt time.Time
Key string
}

name string
done chan struct{}
queue chan string
queuedKeys map[string]time.Time
consumedKeys map[string]time.Time
// Queue is an ordered queue with deduplication.
type Queue struct {
mu sync.Mutex
enqueued map[string]struct{}
queue chan queueItem
}

// CreateQueue allows for creation of a new discovery queue
func CreateQueue(name string) *Queue {
// NewQueue creates a new queue.
func NewQueue() *Queue {
return &Queue{
name: name,
queuedKeys: make(map[string]time.Time),
consumedKeys: make(map[string]time.Time),
queue: make(chan string, config.DiscoveryQueueCapacity),
enqueued: make(map[string]struct{}),
queue: make(chan queueItem, config.DiscoveryQueueCapacity),
}
}

// QueueLen returns the length of the queue (channel size + queued size)
func (q *Queue) QueueLen() int {
q.Lock()
defer q.Unlock()
// setKeyCheckEnqueued returns true if a key is already enqueued, if
// not the key will be marked as enqueued and false is returned.
func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) {
q.mu.Lock()
defer q.mu.Unlock()

return len(q.queue) + len(q.queuedKeys)
if _, found := q.enqueued[key]; found {
alreadyEnqueued = true
} else {
q.enqueued[key] = struct{}{}
}
return alreadyEnqueued
}

// QueueLen returns the length of the queue.
func (q *Queue) QueueLen() int {
return len(q.queue)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code did return len(q.queue) + len(q.queuedKeys) and I'm not sure why 🤔

If we see a reason I can restore this but it adds a lock. I think only metrics call this lock

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lock might be required to prevent a data race (not that its important, but the tests will complain and its going to be annoying for the CI)

}

// Push enqueues a key if it is not on a queue and is not being
// processed; silently returns otherwise.
func (q *Queue) Push(key string) {
q.Lock()
defer q.Unlock()

// is it enqueued already?
if _, found := q.queuedKeys[key]; found {
if q.setKeyCheckEnqueued(key) {
return
}

// is it being processed now?
if _, found := q.consumedKeys[key]; found {
return
q.queue <- queueItem{
PushedAt: time.Now(),
Key: key,
}

q.queuedKeys[key] = time.Now()
q.queue <- key
}

// Consume fetches a key to process; blocks if queue is empty.
// Release must be called once after Consume.
func (q *Queue) Consume() string {
q.Lock()
queue := q.queue
q.Unlock()
item := <-q.queue

key := <-queue

q.Lock()
defer q.Unlock()

// alarm if have been waiting for too long
timeOnQueue := time.Since(q.queuedKeys[key])
timeOnQueue := time.Since(item.PushedAt)
if timeOnQueue > config.GetInstancePollTime() {
log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds())
log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds())
}

q.consumedKeys[key] = q.queuedKeys[key]

delete(q.queuedKeys, key)

return key
return item.Key
}

// Release removes a key from a list of being processed keys
// which allows that key to be pushed into the queue again.
func (q *Queue) Release(key string) {
q.Lock()
defer q.Unlock()
q.mu.Lock()
defer q.mu.Unlock()

delete(q.consumedKeys, key)
delete(q.enqueued, key)
}
81 changes: 81 additions & 0 deletions go/vt/vtorc/discovery/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
q := NewQueue()
require.Zero(t, q.QueueLen())

// Push
q.Push(t.Name())
require.Equal(t, 1, q.QueueLen())
_, found := q.enqueued[t.Name()]
require.True(t, found)

// Push duplicate
q.Push(t.Name())
require.Equal(t, 1, q.QueueLen())

// Consume
require.Equal(t, t.Name(), q.Consume())
require.Zero(t, q.QueueLen())
_, found = q.enqueued[t.Name()]
require.True(t, found)

// Release
q.Release(t.Name())
require.Zero(t, q.QueueLen())
_, found = q.enqueued[t.Name()]
require.False(t, found)
}

type testQueue interface {
QueueLen() int
Push(string)
Consume() string
Release(string)
}

func BenchmarkQueues(b *testing.B) {
tests := []struct {
name string
queue testQueue
}{
{"Current", NewQueue()},
}
for _, test := range tests {
q := test.queue
b.Run(test.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
for i := 0; i < 1000; i++ {
q.Push(b.Name() + strconv.Itoa(i))
}
q.QueueLen()
for i := 0; i < 1000; i++ {
q.Release(q.Consume())
}
}
})
}
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func waitForLocksRelease() {
// handleDiscoveryRequests iterates the discoveryQueue channel and calls upon
// instance discovery per entry.
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateQueue("DEFAULT")
discoveryQueue = discovery.NewQueue()
// create a pool of discovery workers
for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ {
for i := uint(0); i < config.GetDiscoveryMaxConcurrency(); i++ {
go func() {
for {
tabletAlias := discoveryQueue.Consume()
Expand Down
Loading