Skip to content

Commit

Permalink
Fix data race in user list of a queue (#6160)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinjung04 authored Aug 13, 2024
1 parent 6509fb2 commit ee8f8e9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
* [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062
* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077
* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077 #6160
* [BUGFIX] Ingester: Include out-of-order head compaction when compacting TSDB head. #6108
* [BUGFIX] Ingester: Fix `cortex_ingester_tsdb_mmap_chunks_total` metric. #6134
* [BUGFIX] Query Frontend: Fix query rejection bug for metadata queries. #6143
Expand Down
21 changes: 10 additions & 11 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ type querier struct {
// This struct holds user queues for pending requests. It also keeps track of connected queriers,
// and mapping between users and queriers.
type queues struct {
userQueues map[string]*userQueue
userQueuesMx sync.RWMutex

// List of all users with queues, used for iteration when searching for next queue to handle.
// Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink
// this list when there are ""'s at the end of it.
users []string
users []string
userQueues map[string]*userQueue
queuesMx sync.RWMutex

// How long to wait before removing a querier which has got disconnected
// but hasn't notified about a graceful shutdown.
Expand Down Expand Up @@ -102,8 +101,8 @@ func (q *queues) len() int {
}

func (q *queues) deleteQueue(userID string) {
q.userQueuesMx.Lock()
defer q.userQueuesMx.Unlock()
q.queuesMx.Lock()
defer q.queuesMx.Unlock()

uq := q.userQueues[userID]
if uq == nil {
Expand Down Expand Up @@ -134,8 +133,8 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
maxQueriers = 0
}

q.userQueuesMx.Lock()
defer q.userQueuesMx.Unlock()
q.queuesMx.Lock()
defer q.queuesMx.Unlock()

uq := q.userQueues[userID]
priorityEnabled := q.limits.QueryPriority(userID).Enabled
Expand Down Expand Up @@ -222,6 +221,9 @@ func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (userRequestQueue, string, int) {
uid := lastUserIndex

q.queuesMx.RLock()
defer q.queuesMx.RUnlock()

for iters := 0; iters < len(q.users); iters++ {
uid = uid + 1

Expand All @@ -236,9 +238,6 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us
continue
}

q.userQueuesMx.RLock()
defer q.userQueuesMx.RUnlock()

uq := q.userQueues[u]

if uq.queriers != nil {
Expand Down

0 comments on commit ee8f8e9

Please sign in to comment.