diff --git a/server/ipqueue.go b/server/ipqueue.go index e33d01c885..40992d7867 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -283,14 +283,16 @@ func (q *ipQueue[T]) size() uint64 { } // Empty the queue and consumes the notification signal if present. +// Returns the number of items that were drained from the queue. // Note that this could cause a reader go routine that has been // notified that there is something in the queue (reading from queue's `ch`) // may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`. -func (q *ipQueue[T]) drain() { +func (q *ipQueue[T]) drain() int { if q == nil { - return + return 0 } q.Lock() + olen := len(q.elts) - q.pos q.elts, q.pos, q.sz = nil, 0, 0 // Consume the signal if it was present to reduce the chance of a reader // routine to be think that there is something in the queue... @@ -299,6 +301,7 @@ func (q *ipQueue[T]) drain() { default: } q.Unlock() + return olen } // Since the length of the queue goes to 0 after a pop(), it is good to diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 5b27dde28c..8a243d65d1 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -889,11 +889,13 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub limit := atomic.LoadInt64(&js.queueLimit) retry: pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) + atomic.AddInt64(&js.apiInflight, 1) if pending >= int(limit) { if _, ok := s.jsAPIRoutedReqs.popOne(); ok { // If we were able to take one of the oldest items off the queue, then // retry the insert. s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request") + atomic.AddInt64(&js.apiInflight, -1) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ TypedEvent: TypedEvent{ Type: JSAPILimitReachedAdvisoryType, @@ -911,7 +913,7 @@ retry: // then something is wrong for us to be both over the limit but unable to pull entries, so // throw everything away and hope we recover from it. s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) - s.jsAPIRoutedReqs.drain() + atomic.AddInt64(&js.apiInflight, -int64(s.jsAPIRoutedReqs.drain())) s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{ TypedEvent: TypedEvent{ @@ -923,8 +925,6 @@ retry: Domain: js.config.Domain, Dropped: int64(pending), }) - } else { - atomic.StoreInt64(&js.apiInflight, int64(pending)) } } diff --git a/server/raft.go b/server/raft.go index c1102937bf..5ab37facfd 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1962,7 +1962,7 @@ runner: // just will remove them from the central monitoring map queues := []interface { unregister() - drain() + drain() int }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply} for _, q := range queues { q.drain()