-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
vtorc
: cleanup discover queue, add concurrency flag
#17825
base: main
Are you sure you want to change the base?
Changes from all commits
2a0ffd4
1b5fc79
727024b
b5631c9
eacf71d
bffb1f8
49edc9e
dab125d
a443706
faff7e6
56e14d9
8b9bbb1
503bf42
45e87fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,85 +33,76 @@ import ( | |
"vitess.io/vitess/go/vt/vtorc/config" | ||
) | ||
|
||
// Queue contains information for managing discovery requests | ||
type Queue struct { | ||
sync.Mutex | ||
// queueItem represents an item in the discovery.Queue. | ||
type queueItem struct { | ||
PushedAt time.Time | ||
Key string | ||
} | ||
|
||
name string | ||
done chan struct{} | ||
queue chan string | ||
queuedKeys map[string]time.Time | ||
consumedKeys map[string]time.Time | ||
// Queue is an ordered queue with deduplication. | ||
type Queue struct { | ||
mu sync.Mutex | ||
enqueued map[string]struct{} | ||
queue chan queueItem | ||
} | ||
|
||
// CreateQueue allows for creation of a new discovery queue | ||
func CreateQueue(name string) *Queue { | ||
// NewQueue creates a new queue. | ||
func NewQueue() *Queue { | ||
return &Queue{ | ||
name: name, | ||
queuedKeys: make(map[string]time.Time), | ||
consumedKeys: make(map[string]time.Time), | ||
queue: make(chan string, config.DiscoveryQueueCapacity), | ||
enqueued: make(map[string]struct{}), | ||
queue: make(chan queueItem, config.DiscoveryQueueCapacity), | ||
} | ||
} | ||
|
||
// QueueLen returns the length of the queue (channel size + queued size) | ||
func (q *Queue) QueueLen() int { | ||
q.Lock() | ||
defer q.Unlock() | ||
// setKeyCheckEnqueued returns true if a key is already enqueued, if | ||
// not the key will be marked as enqueued and false is returned. | ||
func (q *Queue) setKeyCheckEnqueued(key string) (alreadyEnqueued bool) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
|
||
return len(q.queue) + len(q.queuedKeys) | ||
if _, found := q.enqueued[key]; found { | ||
alreadyEnqueued = true | ||
} else { | ||
q.enqueued[key] = struct{}{} | ||
} | ||
return alreadyEnqueued | ||
} | ||
|
||
// QueueLen returns the length of the queue. | ||
func (q *Queue) QueueLen() int { | ||
return len(q.queue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous code did If we see a reason I can restore this but it adds a lock. I think only metrics call this lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A lock might be required to prevent a data race (not that its important, but the tests will complain and its going to be annoying for the CI) |
||
} | ||
|
||
// Push enqueues a key if it is not on a queue and is not being | ||
// processed; silently returns otherwise. | ||
func (q *Queue) Push(key string) { | ||
q.Lock() | ||
defer q.Unlock() | ||
|
||
// is it enqueued already? | ||
if _, found := q.queuedKeys[key]; found { | ||
if q.setKeyCheckEnqueued(key) { | ||
return | ||
} | ||
|
||
// is it being processed now? | ||
if _, found := q.consumedKeys[key]; found { | ||
return | ||
q.queue <- queueItem{ | ||
PushedAt: time.Now(), | ||
Key: key, | ||
} | ||
|
||
q.queuedKeys[key] = time.Now() | ||
q.queue <- key | ||
} | ||
|
||
// Consume fetches a key to process; blocks if queue is empty. | ||
// Release must be called once after Consume. | ||
func (q *Queue) Consume() string { | ||
q.Lock() | ||
queue := q.queue | ||
q.Unlock() | ||
item := <-q.queue | ||
|
||
key := <-queue | ||
|
||
q.Lock() | ||
defer q.Unlock() | ||
|
||
// alarm if have been waiting for too long | ||
timeOnQueue := time.Since(q.queuedKeys[key]) | ||
timeOnQueue := time.Since(item.PushedAt) | ||
if timeOnQueue > config.GetInstancePollTime() { | ||
log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds()) | ||
log.Warningf("key %v spent %.4fs waiting on a discovery queue", item.Key, timeOnQueue.Seconds()) | ||
} | ||
|
||
q.consumedKeys[key] = q.queuedKeys[key] | ||
|
||
delete(q.queuedKeys, key) | ||
|
||
return key | ||
return item.Key | ||
} | ||
|
||
// Release removes a key from a list of being processed keys | ||
// which allows that key to be pushed into the queue again. | ||
func (q *Queue) Release(key string) { | ||
q.Lock() | ||
defer q.Unlock() | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
|
||
delete(q.consumedKeys, key) | ||
delete(q.enqueued, key) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
Copyright 2025 The Vitess Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package discovery | ||
|
||
import ( | ||
"strconv" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestQueue(t *testing.T) { | ||
q := NewQueue() | ||
require.Zero(t, q.QueueLen()) | ||
|
||
// Push | ||
q.Push(t.Name()) | ||
require.Equal(t, 1, q.QueueLen()) | ||
_, found := q.enqueued[t.Name()] | ||
require.True(t, found) | ||
|
||
// Push duplicate | ||
q.Push(t.Name()) | ||
require.Equal(t, 1, q.QueueLen()) | ||
|
||
// Consume | ||
require.Equal(t, t.Name(), q.Consume()) | ||
require.Zero(t, q.QueueLen()) | ||
_, found = q.enqueued[t.Name()] | ||
require.True(t, found) | ||
|
||
// Release | ||
q.Release(t.Name()) | ||
require.Zero(t, q.QueueLen()) | ||
_, found = q.enqueued[t.Name()] | ||
require.False(t, found) | ||
} | ||
|
||
type testQueue interface { | ||
QueueLen() int | ||
Push(string) | ||
Consume() string | ||
Release(string) | ||
} | ||
|
||
func BenchmarkQueues(b *testing.B) { | ||
tests := []struct { | ||
name string | ||
queue testQueue | ||
}{ | ||
{"Current", NewQueue()}, | ||
} | ||
for _, test := range tests { | ||
q := test.queue | ||
b.Run(test.name, func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
for i := 0; i < 1000; i++ { | ||
q.Push(b.Name() + strconv.Itoa(i)) | ||
} | ||
q.QueueLen() | ||
for i := 0; i < 1000; i++ { | ||
q.Release(q.Consume()) | ||
} | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably shouldn't be a dynamic variable since we don't change the discovery queue capacity after its been initialized.