Skip to content

Commit

Permalink
Remove unused code in discovery queue creation (vitessio#17515)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jan 14, 2025
1 parent a9d6969 commit fd0ffeb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 61 deletions.
63 changes: 3 additions & 60 deletions go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
)

// QueueMetric contains the queue's active and queued sizes
type QueueMetric struct {
Active int
Queued int
}

// Queue contains information for managing discovery requests
type Queue struct {
sync.Mutex
Expand All @@ -48,67 +42,16 @@ type Queue struct {
queue chan string
queuedKeys map[string]time.Time
consumedKeys map[string]time.Time
metrics []QueueMetric
}

// DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring.
// Currently this is accessed by ContinuousDiscovery() but also from http api calls.
// I may need to protect this better?
var discoveryQueue map[string](*Queue)
var dcLock sync.Mutex

func init() {
discoveryQueue = make(map[string](*Queue))
}

// CreateOrReturnQueue allows for creation of a new discovery queue or
// returning a pointer to an existing one given the name.
func CreateOrReturnQueue(name string) *Queue {
dcLock.Lock()
defer dcLock.Unlock()
if q, found := discoveryQueue[name]; found {
return q
}

q := &Queue{
// CreateQueue allows for creation of a new discovery queue
func CreateQueue(name string) *Queue {
return &Queue{
name: name,
queuedKeys: make(map[string]time.Time),
consumedKeys: make(map[string]time.Time),
queue: make(chan string, config.DiscoveryQueueCapacity),
}
go q.startMonitoring()

discoveryQueue[name] = q

return q
}

// monitoring queue sizes until we are told to stop
func (q *Queue) startMonitoring() {
log.Infof("Queue.startMonitoring(%s)", q.name)
ticker := time.NewTicker(time.Second) // hard-coded at every second

for {
select {
case <-ticker.C: // do the periodic expiry
q.collectStatistics()
case <-q.done:
return
}
}
}

// do a check of the entries in the queue, both those active and queued
func (q *Queue) collectStatistics() {
q.Lock()
defer q.Unlock()

q.metrics = append(q.metrics, QueueMetric{Queued: len(q.queuedKeys), Active: len(q.consumedKeys)})

// remove old entries if we get too big
if len(q.metrics) > config.DiscoveryQueueMaxStatisticsSize {
q.metrics = q.metrics[len(q.metrics)-config.DiscoveryQueueMaxStatisticsSize:]
}
}

// QueueLen returns the length of the queue (channel size + queued size)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func waitForLocksRelease() {
// handleDiscoveryRequests iterates the discoveryQueue channel and calls upon
// instance discovery per entry.
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")
discoveryQueue = discovery.CreateQueue("DEFAULT")
// create a pool of discovery workers
for i := uint(0); i < config.DiscoveryMaxConcurrency; i++ {
go func() {
Expand Down

0 comments on commit fd0ffeb

Please sign in to comment.