-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JetStream Consumer num_ack_pending never becomes zero after application terminates #6093
Comments
Hey, I wrote a small test with the issue: Looking into the code, it seems that when the message is added to the Redelivery List, it is still in the pending list. (Also the Timeout is updated) func TestConsumerNumAckPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 2 * time.Second,
})
require_NoError(t, err)
for i := 0; i < 5; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}
info, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 0)
require_Equal(t, info.NumPending, 5)
req := JSApiConsumerGetNextRequest{Batch: 1}
reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
_ = natsNexMsg(t, sub, time.Second)
info, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 1)
require_Equal(t, info.NumPending, 4)
checkFor(t, 10*time.Second, 1*time.Second, func() error {
info, err := js.ConsumerInfo("TEST", "dur")
if err != nil {
return err
}
if info.NumAckPending != 0 {
return fmt.Errorf("Expected no ack pending, got %d", info.NumAckPending)
}
if info.NumPending != 5 {
return fmt.Errorf("Expected 5 pending, got %d", info.NumPending)
}
return nil
})
} |
What server version? |
If their is not a max redelivery set and you are using a pull consumer that never requests any more messages that is correct, num_ack_pending will not go down. |
The test is failing with the "main" branch. I saw that the seq is added to the RedeliveryQueue but not removed from Pending, and when it is sent to the consumer, the timeout is set again. |
What server version are you running? We did improve the num_pending when the consumer is a pull based consumer and no additional requests for messages are received. |
@davidmcote 2.10.18 The test that I posted is failing to current "main" $ git log -1 | cat
commit 32c26d6bdb30b7ac0f5921fdfa0bf018c285cd3c
Merge: 869d3ad2 695cec9f
Author: Derek Collison <[email protected]>
Date: Wed Nov 20 13:09:35 2024 -0800
$ go test -timeout 60s -run ^TestConsumerNumAckPending$ github.com/nats-io/nats-server/v2/server -count=1 -v -race
=== RUN TestConsumerNumAckPending
jetstream_consumer_test.go:2583: Expected no ack pending, got 1
--- FAIL: TestConsumerNumAckPending (10.07s)
FAIL
FAIL github.com/nats-io/nats-server/v2/server 11.087s
FAIL |
If the system can try to deliver num ack pending will stay fixed until acked. or max deliveries is hit. |
This indeed sounds like it's intended behaviour. Num ack pending only goes down if:
|
I was doing some tests with NakWithDelay. If MaxAckPending = = Msg with NakWithDelay, messages will stay in the pending hashmap, limiting the option to get another message. func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// ...
// Check if we have max pending.
if o.maxp > 0 && len(o.pending) >= o.maxp {
// maxp only set when ack policy != AckNone and user set MaxAckPending
// Stall if we have hit max pending.
return nil, 0, errMaxAckPending
}
// ... I have written a test that is failing in the last line as I was expecting to be allowed to continue consuming other messages: func TestConsumerNumAckPendingWithNakDelay(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 2 * time.Second,
MaxAckPending: 2,
})
require_NoError(t, err)
for i := 0; i < 5; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}
info, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 0)
require_Equal(t, info.NumPending, 5)
req := JSApiConsumerGetNextRequest{Batch: 1}
reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
msg := natsNexMsg(t, sub, time.Second)
err = msg.NakWithDelay(time.Minute)
require_NoError(t, err)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
msg = natsNexMsg(t, sub, time.Second)
err = msg.NakWithDelay(time.Minute)
require_NoError(t, err)
info, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 2)
require_Equal(t, info.NumPending, 3)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
// Fails because the server never responds as getNextMsg() returns errMaxAckPending
_ = natsNexMsg(t, sub, time.Second)
} |
Correct, that's the intended behaviour. With two slight modifications to your test, it will start to pass:
err = msg.NakWithDelay(4*time.Second)
_ = natsNexMsg(t, sub, 5*time.Second) Could also leave the If you'd want to get the third message, while the first two messages are scheduled for redelivery after a minute. Then you'll need to up the |
Thank you for the explanation. This issue can be closed if this is the expected behaviour. |
Observed behavior
If an application terminates before acking its messages from a durable Jetstream Consumer and no further batches of messages are requested from the Consumer, num_ack_pending in ConsumerInfo never becomes zero.
Expected behavior
num_ack_pending should become zero after the JetStream Consumer's AckWait period expires on all outstanding messages.
Server and client version
nats-server 2.10.18
Host environment
Any. Apple M3 Pro.
Steps to reproduce
1. Create a stream and durable consumer w/ explicit ack & ackwait.
2 Receive a message without acknowledging it.
3 Poll ConsumerInfo
Notice that last activity is greater than the ack wait period yet num_ack_pending still shows there's an outstanding message.
The message is immediately ready for redelivery if another request comes along to pull messages.
It seems the ConsumerInfo counters are only advanced by requests which mutate the underlying stream.
As a workaround, I tried to request a zero-sized batch.
natscli appears to no-op on the client side for such a silly request.
I crafted the batch request manually, but nats-server clamps my batchsize back up to 1 and this counts against the delivery count of the sacrificial message.
The text was updated successfully, but these errors were encountered: