Skip to content

Commit

Permalink
better setup of pending limits (#804)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 23, 2022
1 parent 09275d8 commit 364f9d0
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/PushSubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public static class Builder
private boolean ordered;
private String deliverSubject;
private String deliverGroup;
private long pendingMessageLimit = -1;
private long pendingByteLimit = -1;
private long pendingMessageLimit = Consumer.DEFAULT_MAX_MESSAGES;
private long pendingByteLimit = Consumer.DEFAULT_MAX_BYTES;

@Override
protected Builder getThis() {
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/io/nats/client/impl/NatsConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,21 @@ abstract class NatsConsumer implements Consumer {

/**
* Set limits on the maximum number of messages, or maximum size of messages
* this consumer will hold before it starts to drop new messages waiting for the
* application to drain the queue.
*
* this consumer will hold before it starts to drop new messages waiting.
* <p>
* Messages are dropped as they encounter a full queue, which is to say, new
* messages are dropped rather than old messages. If a queue is 10 deep and
* fills up, the 11th message is dropped.
*
* <p>
* Any value less than or equal to zero means unlimited and will be stored as 0.
* @param maxMessages the maximum message count to hold, defaults to
* {{@value #DEFAULT_MAX_MESSAGES}}.
* @param maxBytes the maximum bytes to hold, defaults to
* {{@value #DEFAULT_MAX_BYTES}}.
*/
public void setPendingLimits(long maxMessages, long maxBytes) {
this.maxMessages.set(maxMessages < 0 ? Consumer.DEFAULT_MAX_MESSAGES : maxMessages);
this.maxBytes.set(maxBytes < 0 ? Consumer.DEFAULT_MAX_BYTES : maxBytes);
this.maxMessages.set(maxMessages <= 0 ? 0 : maxMessages);
this.maxBytes.set(maxBytes <= 0 ? 0 : maxBytes);
}

/**
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ JetStreamSubscription createSubscription(String subject,
if (so.isOrdered() && qgroup != null) {
throw JsSubOrderedNotAllowOnQueues.instance();
}

if (dispatcher != null &&
(so.getPendingMessageLimit() != Consumer.DEFAULT_MAX_MESSAGES ||
so.getPendingByteLimit() != Consumer.DEFAULT_MAX_BYTES))
{
throw JsSubPushAsyncCantSetPending.instance();
}
}

// 2A. Flow Control / heartbeat not always valid
Expand Down Expand Up @@ -403,9 +410,6 @@ else if (so.isOrdered()) {
if (lDispatcher == null) {
nsub.setPendingLimits(so.getPendingMessageLimit(), so.getPendingByteLimit());
}
else if (so.getPendingMessageLimit() >= 0 || so.getPendingByteLimit() >= 0){
throw JsSubPushAsyncCantSetPending.instance();
}
return nsub;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testStreamCreate() throws Exception {
assertEquals(2, sc.getSubjects().size());
assertEquals(subject(0), sc.getSubjects().get(0));
assertEquals(subject(1), sc.getSubjects().get(1));
assertTrue(subject(0).compareTo(subject(1)) != 0); // coverage

assertEquals(RetentionPolicy.Limits, sc.getRetentionPolicy());
assertEquals(DiscardPolicy.Old, sc.getDiscardPolicy());
Expand Down
63 changes: 45 additions & 18 deletions src/test/java/io/nats/client/impl/JetStreamPushTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -661,38 +661,61 @@ public void testPendingLimits() throws Exception {
// create the stream.
createDefaultTestStream(nc);

int customMessageLimit = 1000;
int customByteLimit = 1024 * 1024;

PushSubscribeOptions psoDefaultSync = PushSubscribeOptions.builder()
.build();

int customMessageLimit = 1000;
int customByteLimit = 1024 * 1024;
PushSubscribeOptions psoCustomSync = PushSubscribeOptions.builder()
.pendingMessageLimit(customMessageLimit)
.pendingByteLimit(customByteLimit)
.build();

JetStreamSubscription subDefaultSync = js.subscribe(SUBJECT, psoDefaultSync);
JetStreamSubscription subCustomSync = js.subscribe(SUBJECT, psoCustomSync);
PushSubscribeOptions psoCustomSyncUnlimited0 = PushSubscribeOptions.builder()
.pendingMessageLimit(0)
.pendingByteLimit(0)
.build();

PushSubscribeOptions psoCustomSyncUnlimitedUnlimitedNegative = PushSubscribeOptions.builder()
.pendingMessageLimit(-1)
.pendingByteLimit(-1)
.build();

JetStreamSubscription syncSub = js.subscribe(SUBJECT, psoDefaultSync);
assertEquals(Consumer.DEFAULT_MAX_MESSAGES, syncSub.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, syncSub.getPendingByteLimit());

assertEquals(Consumer.DEFAULT_MAX_MESSAGES, subDefaultSync.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, subDefaultSync.getPendingByteLimit());
syncSub = js.subscribe(SUBJECT, psoCustomSync);
assertEquals(customMessageLimit, syncSub.getPendingMessageLimit());
assertEquals(customByteLimit, syncSub.getPendingByteLimit());

assertEquals(customMessageLimit, subCustomSync.getPendingMessageLimit());
assertEquals(customByteLimit, subCustomSync.getPendingByteLimit());
syncSub = js.subscribe(SUBJECT, psoCustomSyncUnlimited0);
assertEquals(0, syncSub.getPendingMessageLimit());
assertEquals(0, syncSub.getPendingByteLimit());

syncSub = js.subscribe(SUBJECT, psoCustomSyncUnlimitedUnlimitedNegative);
assertEquals(0, syncSub.getPendingMessageLimit());
assertEquals(0, syncSub.getPendingByteLimit());

Dispatcher d = nc.createDispatcher();
d.setPendingLimits(customMessageLimit, customByteLimit);
assertEquals(customMessageLimit, d.getPendingMessageLimit());
assertEquals(customByteLimit, d.getPendingByteLimit());

PushSubscribeOptions psoAsyncDefault = PushSubscribeOptions.builder()
PushSubscribeOptions psoAsyncDefault = PushSubscribeOptions.builder().build();
PushSubscribeOptions psoAsyncNonDefaultValid = PushSubscribeOptions.builder()
.pendingMessageLimit(Consumer.DEFAULT_MAX_MESSAGES)
.pendingByteLimit(Consumer.DEFAULT_MAX_BYTES)
.build();

// any negative is treated as not set / ignored
PushSubscribeOptions psoAsyncLtZero = PushSubscribeOptions.builder()
.pendingMessageLimit(-999)
.pendingByteLimit(-999)
.build();
JetStreamSubscription subAsync = js.subscribe(SUBJECT, d, m -> {}, false, psoAsyncDefault);
assertEquals(Consumer.DEFAULT_MAX_MESSAGES, subAsync.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, subAsync.getPendingByteLimit());

subAsync = js.subscribe(SUBJECT, d, m -> {}, false, psoAsyncNonDefaultValid);
assertEquals(Consumer.DEFAULT_MAX_MESSAGES, subAsync.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, subAsync.getPendingByteLimit());

PushSubscribeOptions psoAsyncNopeMessages = PushSubscribeOptions.builder()
.pendingMessageLimit(customMessageLimit)
Expand All @@ -702,14 +725,18 @@ public void testPendingLimits() throws Exception {
.pendingByteLimit(customByteLimit)
.build();

JetStreamSubscription subAsyncDefault = js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncDefault);
assertEquals(Consumer.DEFAULT_MAX_MESSAGES, subAsyncDefault.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, subAsyncDefault.getPendingByteLimit());
PushSubscribeOptions psoAsyncNope2Messages = PushSubscribeOptions.builder()
.pendingMessageLimit(0)
.build();

js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncLtZero);
PushSubscribeOptions psoAsyncNope2Bytes = PushSubscribeOptions.builder()
.pendingByteLimit(0)
.build();

assertClientError(JsSubPushAsyncCantSetPending, () -> js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncNopeMessages));
assertClientError(JsSubPushAsyncCantSetPending, () -> js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncNopeBytes));
assertClientError(JsSubPushAsyncCantSetPending, () -> js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncNope2Messages));
assertClientError(JsSubPushAsyncCantSetPending, () -> js.subscribe(SUBJECT, d, m ->{}, false, psoAsyncNope2Bytes));
});
}
}
8 changes: 4 additions & 4 deletions src/test/java/io/nats/client/impl/SlowConsumerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testSlowSubscriberByMessages() throws Exception {
sub.setPendingLimits(1, -1);

assertEquals(1, sub.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, sub.getPendingByteLimit());
assertEquals(0, sub.getPendingByteLimit());
assertEquals(0, sub.getDroppedCount());

nc.publish("subject", null);
Expand Down Expand Up @@ -85,7 +85,7 @@ public void testSlowSubscriberByBytes() throws Exception {
sub.setPendingLimits(-1, 10); // will take the first, not the second

assertEquals(10, sub.getPendingByteLimit());
assertEquals(Consumer.DEFAULT_MAX_MESSAGES, sub.getPendingMessageLimit());
assertEquals(0, sub.getPendingMessageLimit());
assertEquals(0, sub.getDroppedCount());

nc.publish("subject", null);
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testSlowSDispatcherByMessages() throws Exception {
d.subscribe("subject");

assertEquals(1, d.getPendingMessageLimit());
assertEquals(Consumer.DEFAULT_MAX_BYTES, d.getPendingByteLimit());
assertEquals(0, d.getPendingByteLimit());
assertEquals(0, d.getDroppedCount());

nc.publish("subject", null);
Expand Down Expand Up @@ -162,7 +162,7 @@ public void testSlowSDispatcherByBytes() throws Exception {
d.setPendingLimits(-1, 10);
d.subscribe("subject");

assertEquals(Consumer.DEFAULT_MAX_MESSAGES, d.getPendingMessageLimit());
assertEquals(0, d.getPendingMessageLimit());
assertEquals(10, d.getPendingByteLimit());
assertEquals(0, d.getDroppedCount());

Expand Down

0 comments on commit 364f9d0

Please sign in to comment.