Skip to content

Commit

Permalink
Improve tracking of apiInflight metric
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jan 13, 2025
1 parent 631340b commit 64f03ef
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
7 changes: 5 additions & 2 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,16 @@ func (q *ipQueue[T]) size() uint64 {
}

// Empty the queue and consumes the notification signal if present.
// Returns the number of items that were drained from the queue.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue[T]) drain() {
func (q *ipQueue[T]) drain() int {
if q == nil {
return
return 0
}
q.Lock()
olen := len(q.elts) - q.pos
q.elts, q.pos, q.sz = nil, 0, 0
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
Expand All @@ -299,6 +301,7 @@ func (q *ipQueue[T]) drain() {
default:
}
q.Unlock()
return olen
}

// Since the length of the queue goes to 0 after a pop(), it is good to
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,13 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
limit := atomic.LoadInt64(&js.queueLimit)
retry:
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
atomic.AddInt64(&js.apiInflight, 1)
if pending >= int(limit) {
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
// If we were able to take one of the oldest items off the queue, then
// retry the insert.
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
atomic.AddInt64(&js.apiInflight, -1)
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
Expand All @@ -911,7 +913,7 @@ retry:
// then something is wrong for us to be both over the limit but unable to pull entries, so
// throw everything away and hope we recover from it.
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
atomic.AddInt64(&js.apiInflight, -int64(s.jsAPIRoutedReqs.drain()))

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Expand All @@ -923,8 +925,6 @@ retry:
Domain: js.config.Domain,
Dropped: int64(pending),
})
} else {
atomic.StoreInt64(&js.apiInflight, int64(pending))
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,7 +1962,7 @@ runner:
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
drain() int
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
Expand Down

0 comments on commit 64f03ef

Please sign in to comment.