Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream Consumer num_ack_pending never becomes zero after application terminates #6093

Open
davidmcote opened this issue Nov 8, 2024 · 11 comments
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@davidmcote
Copy link
Contributor

Observed behavior

If an application terminates before acking its messages from a durable Jetstream Consumer and no further batches of messages are requested from the Consumer, num_ack_pending in ConsumerInfo never becomes zero.

Expected behavior

num_ack_pending should become zero after the JetStream Consumer's AckWait period expires on all outstanding messages.

Server and client version

nats-server 2.10.18

Host environment

Any. Apple M3 Pro.

Steps to reproduce

1. Create a stream and durable consumer w/ explicit ack & ackwait.

% nats stream add mystream
? Subjects mysubject
? Storage memory
? Replication 1
? Retention Policy Interest
? Discard Policy New
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream mystream was created

Information for Stream mystream created 2024-11-08 08:17:33

              Subjects: mysubject
              Replicas: 1
               Storage: Memory

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: New
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 0
         Last Sequence: 0
      Active Consumers: 0


% nats consumer add
? Consumer name myconsumer
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all)
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
? Select a Stream mystream
Information for Consumer mystream > myconsumer created 2024-11-08T08:18:41-05:00

Configuration:

                    Name: myconsumer
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512

2 Receive a message without acknowledging it.

% nats req mysubject body
08:20:00 Sending request on "mysubject"
08:20:00 Received with rtt 371.167µs
{"stream":"mystream", "seq":1}


% nats consumer next --no-ack mystream myconsumer
[08:20:14] subj: mysubject / tries: 1 / cons seq: 1 / str seq: 1 / pending: 0

body

3 Poll ConsumerInfo

Notice that last activity is greater than the ack wait period yet num_ack_pending still shows there's an outstanding message.

% nats consumer info mystream myconsumer
Information for Consumer mystream > myconsumer created 2024-11-08T08:18:41-05:00

Configuration:

                    Name: myconsumer
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 1 Stream sequence: 1 Last delivery: 39.50s ago
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 1 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512

The message is immediately ready for redelivery if another request comes along to pull messages.

 % nats consumer next --no-ack mystream myconsumer
[08:22:40] subj: mysubject / tries: 2 / cons seq: 2 / str seq: 1 / pending: 0

body

It seems the ConsumerInfo counters are only advanced by requests which mutate the underlying stream.

As a workaround, I tried to request a zero-sized batch.

natscli appears to no-op on the client side for such a silly request.

% nats consumer next --no-ack mystream myconsumer --count 0

I crafted the batch request manually, but nats-server clamps my batchsize back up to 1 and this counts against the delivery count of the sacrificial message.

% nats req '$JS.API.CONSUMER.MSG.NEXT.mystream.myconsumer' '{"expires":5000000000,"batch":0}'
08:25:23 Sending request on "$JS.API.CONSUMER.MSG.NEXT.mystream.myconsumer"
08:25:23 Received with rtt 401.791µs
body
@davidmcote davidmcote added the defect Suspected defect such as a bug or regression label Nov 8, 2024
@davidmcote davidmcote changed the title JetStream durable consumer num_ack_pending never becomes zero after application terminates JetStream Consumer num_ack_pending never becomes zero after application terminates Nov 8, 2024
@ramonberrutti
Copy link
Contributor

ramonberrutti commented Nov 20, 2024

Hey, I wrote a small test with the issue:

Looking into the code, it seems that when the message is added to the Redelivery List, it is still in the pending list. (Also the Timeout is updated)

func TestConsumerNumAckPending(t *testing.T) {
	s := RunBasicJetStreamServer(t)
	defer s.Shutdown()

	nc, js := jsClientConnect(t, s)
	defer nc.Close()

	_, err := js.AddStream(&nats.StreamConfig{
		Name:     "TEST",
		Subjects: []string{"test"},
	})
	require_NoError(t, err)

	_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
		Durable:   "dur",
		AckPolicy: nats.AckExplicitPolicy,
		AckWait:   2 * time.Second,
	})
	require_NoError(t, err)

	for i := 0; i < 5; i++ {
		_, err := js.Publish("test", nil)
		require_NoError(t, err)
	}

	info, err := js.ConsumerInfo("TEST", "dur")
	require_NoError(t, err)

	require_Equal(t, info.NumAckPending, 0)
	require_Equal(t, info.NumPending, 5)

	req := JSApiConsumerGetNextRequest{Batch: 1}
	reqb, _ := json.Marshal(req)

	sub := natsSubSync(t, nc, nats.NewInbox())
	err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
	require_NoError(t, err)

	_ = natsNexMsg(t, sub, time.Second)

	info, err = js.ConsumerInfo("TEST", "dur")
	require_NoError(t, err)

	require_Equal(t, info.NumAckPending, 1)
	require_Equal(t, info.NumPending, 4)

	checkFor(t, 10*time.Second, 1*time.Second, func() error {
		info, err := js.ConsumerInfo("TEST", "dur")
		if err != nil {
			return err
		}

		if info.NumAckPending != 0 {
			return fmt.Errorf("Expected no ack pending, got %d", info.NumAckPending)
		}

		if info.NumPending != 5 {
			return fmt.Errorf("Expected 5 pending, got %d", info.NumPending)
		}
		return nil
	})
}

@derekcollison
Copy link
Member

What server version?

@derekcollison
Copy link
Member

If their is not a max redelivery set and you are using a pull consumer that never requests any more messages that is correct, num_ack_pending will not go down.

@ramonberrutti
Copy link
Contributor

What server version?

The test is failing with the "main" branch.

I saw that the seq is added to the RedeliveryQueue but not removed from Pending, and when it is sent to the consumer, the timeout is set again.

@derekcollison
Copy link
Member

What server version are you running? We did improve the num_pending when the consumer is a pull based consumer and no additional requests for messages are received.

@ramonberrutti
Copy link
Contributor

ramonberrutti commented Nov 20, 2024

What server version are you running? We did improve the num_pending when the consumer is a pull based consumer and no additional requests for messages are received.

@davidmcote 2.10.18

The test that I posted is failing to current "main"

$ git log -1 | cat 
commit 32c26d6bdb30b7ac0f5921fdfa0bf018c285cd3c
Merge: 869d3ad2 695cec9f
Author: Derek Collison <[email protected]>
Date:   Wed Nov 20 13:09:35 2024 -0800

$ go test -timeout 60s -run ^TestConsumerNumAckPending$ github.com/nats-io/nats-server/v2/server -count=1 -v -race
=== RUN   TestConsumerNumAckPending
    jetstream_consumer_test.go:2583: Expected no ack pending, got 1
--- FAIL: TestConsumerNumAckPending (10.07s)
FAIL
FAIL    github.com/nats-io/nats-server/v2/server        11.087s
FAIL

@derekcollison
Copy link
Member

If the system can try to deliver num ack pending will stay fixed until acked. or max deliveries is hit.

@MauriceVanVeen
Copy link
Member

This indeed sounds like it's intended behaviour.
When a message is delivered, for example through the next message call above, it will count toward num ack pending.
If you don't ACK it will take until AckWait before you get the message redelivered, and the message's metadata will reflect that with a delivery count of 2. During that time the message is still pending acknowledgement and being redelivered to you after the AckWait, hence why it's counting toward num ack pending.

Num ack pending only goes down if:

  • You ACK/TERM the message, since that signals you don´t want the message to be delivered anymore/you've processed it.
  • Your message got removed due to retention policies, like MaxAge (meaning it can't be delivered anymore, as the message is removed).
  • You have MaxDeliver set up for the consumer. After the message has hit the maximum amount of deliveries, num ack pending will decrease and this message will not be delivered to you again.

@ramonberrutti
Copy link
Contributor

I was doing some tests with NakWithDelay. If MaxAckPending = = Msg with NakWithDelay, messages will stay in the pending hashmap, limiting the option to get another message.

func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
        // ...

	// Check if we have max pending.
	if o.maxp > 0 && len(o.pending) >= o.maxp {
		// maxp only set when ack policy != AckNone and user set MaxAckPending
		// Stall if we have hit max pending.
		return nil, 0, errMaxAckPending
	}

       // ...

I have written a test that is failing in the last line as I was expecting to be allowed to continue consuming other messages:

func TestConsumerNumAckPendingWithNakDelay(t *testing.T) {
	s := RunBasicJetStreamServer(t)
	defer s.Shutdown()

	nc, js := jsClientConnect(t, s)
	defer nc.Close()

	_, err := js.AddStream(&nats.StreamConfig{
		Name:     "TEST",
		Subjects: []string{"test"},
	})
	require_NoError(t, err)

	_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
		Durable:       "dur",
		AckPolicy:     nats.AckExplicitPolicy,
		AckWait:       2 * time.Second,
		MaxAckPending: 2,
	})
	require_NoError(t, err)

	for i := 0; i < 5; i++ {
		_, err := js.Publish("test", nil)
		require_NoError(t, err)
	}

	info, err := js.ConsumerInfo("TEST", "dur")
	require_NoError(t, err)

	require_Equal(t, info.NumAckPending, 0)
	require_Equal(t, info.NumPending, 5)

	req := JSApiConsumerGetNextRequest{Batch: 1}
	reqb, _ := json.Marshal(req)

	sub := natsSubSync(t, nc, nats.NewInbox())
	err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
	require_NoError(t, err)

	msg := natsNexMsg(t, sub, time.Second)
	err = msg.NakWithDelay(time.Minute)
	require_NoError(t, err)

	err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
	require_NoError(t, err)

	msg = natsNexMsg(t, sub, time.Second)
	err = msg.NakWithDelay(time.Minute)
	require_NoError(t, err)

	info, err = js.ConsumerInfo("TEST", "dur")
	require_NoError(t, err)

	require_Equal(t, info.NumAckPending, 2)
	require_Equal(t, info.NumPending, 3)

	err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
	require_NoError(t, err)

	// Fails because the server never responds as getNextMsg() returns errMaxAckPending
	_ = natsNexMsg(t, sub, time.Second)
}

@MauriceVanVeen
Copy link
Member

If MaxAckPending = = Msg with NakWithDelay, messages will stay in the pending hashmap, limiting the option to get another message.

Correct, that's the intended behaviour.
When you call next message it will up the num ack pending count, if you then call NAK it will still count for this. A message that's scheduled for re-delivery also requires to be acked/termed at some point, so it counts toward num ack pending. It's still a pending message.

With two slight modifications to your test, it will start to pass:

  • Don't plan redelivery after a minute, but after 4 seconds for example (just shorter for demonstration):
err = msg.NakWithDelay(4*time.Second)
  • Then in the last next message call, wait for 5 seconds. The first message you'll get is the first one you called NakWithDelay on:
_ = natsNexMsg(t, sub, 5*time.Second)

Could also leave the NakWithDelay on a minute, and then wait more than a minute on the last next message call, but that will of course take longer to see the same result as with the shorter timeouts above.

If you'd want to get the third message, while the first two messages are scheduled for redelivery after a minute. Then you'll need to up the MaxAckPending to 3. (If not set explicitly, the default of MaxAckPending is 1000)

@ramonberrutti
Copy link
Contributor

ramonberrutti commented Nov 21, 2024

Thank you for the explanation. This issue can be closed if this is the expected behaviour.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

5 participants