Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer Limits Stream Configuration #979

Merged
merged 6 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerLimits.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;

import java.time.Duration;

import static io.nats.client.api.ConsumerConfiguration.*;
import static io.nats.client.support.ApiConstants.INACTIVE_THRESHOLD;
import static io.nats.client.support.ApiConstants.MAX_ACK_PENDING;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readNanos;

/**
* ConsumerLimits
*/
public class ConsumerLimits implements JsonSerializable {
private final Duration inactiveThreshold;
private final Integer maxAckPending;

static ConsumerLimits optionalInstance(JsonValue vConsumerLimits) {
return vConsumerLimits == null ? null : new ConsumerLimits(vConsumerLimits);
}

ConsumerLimits(JsonValue vConsumerLimits) {
inactiveThreshold = readNanos(vConsumerLimits, INACTIVE_THRESHOLD);
maxAckPending = readInteger(vConsumerLimits, MAX_ACK_PENDING);
}

ConsumerLimits(ConsumerLimits.Builder b) {
this.inactiveThreshold = b.inactiveThreshold;
this.maxAckPending = b.maxAckPending;
}

/**
* Get the amount of time before the consumer is deemed inactive.
* @return the inactive threshold
*/
public Duration getInactiveThreshold() {
return inactiveThreshold;
}

/**
* Gets the maximum ack pending configuration.
* @return maximum ack pending.
*/
public long getMaxAckPending() {
return getOrUnset(maxAckPending);
}

public String toJson() {
StringBuilder sb = beginJson();
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
return endJson(sb).toString();
}

/**
* Creates a builder for a consumer limits object.
* @return the builder.
*/
public static Builder builder() {
return new Builder();
}

/**
* ConsumerLimits can be created using a Builder.
*/
public static class Builder {
private Duration inactiveThreshold;
private Integer maxAckPending;

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration
* @return Builder
*/
public Builder inactiveThreshold(Duration inactiveThreshold) {
this.inactiveThreshold = normalize(inactiveThreshold);
return this;
}

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration in milliseconds
* @return Builder
*/
public Builder inactiveThreshold(long inactiveThreshold) {
this.inactiveThreshold = normalizeDuration(inactiveThreshold);
return this;
}

/**
* Sets the maximum ack pending or null to unset / clear.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(Long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Sets the maximum ack pending.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Build a ConsumerLimits object
* @return the ConsumerLimits
*/
public ConsumerLimits build() {
return new ConsumerLimits(this);
}
}
}
41 changes: 28 additions & 13 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class StreamConfiguration implements JsonSerializable {
private final Placement placement;
private final Republish republish;
private final SubjectTransform subjectTransform;
private final ConsumerLimits consumerLimits;
private final Mirror mirror;
private final List<Source> sources;
private final boolean sealed;
Expand Down Expand Up @@ -91,6 +92,7 @@ static StreamConfiguration instance(JsonValue v) {
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)));
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
builder.sealed(readBoolean(v, SEALED));
Expand Down Expand Up @@ -127,6 +129,7 @@ static StreamConfiguration instance(JsonValue v) {
this.placement = b.placement;
this.republish = b.republish;
this.subjectTransform = b.subjectTransform;
this.consumerLimits = b.consumerLimits;
this.mirror = b.mirror;
this.sources = b.sources;
this.sealed = b.sealed;
Expand Down Expand Up @@ -168,20 +171,12 @@ public String toJson() {
addField(sb, TEMPLATE_OWNER, templateOwner);
addField(sb, DISCARD, discardPolicy.toString());
addFieldAsNanos(sb, DUPLICATE_WINDOW, duplicateWindow);
if (placement != null) {
addField(sb, PLACEMENT, placement);
}
if (republish != null) {
addField(sb, REPUBLISH, republish);
}
if (subjectTransform != null) {
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
}
if (mirror != null) {
addField(sb, MIRROR, mirror);
}
addField(sb, PLACEMENT, placement);
addField(sb, REPUBLISH, republish);
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
addField(sb, CONSUMER_LIMITS, consumerLimits);
addField(sb, MIRROR, mirror);
addJsons(sb, SOURCES, sources);

addFldWhenTrue(sb, SEALED, sealed);
addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup);
addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect);
Expand Down Expand Up @@ -357,6 +352,14 @@ public SubjectTransform getSubjectTransform() {
return subjectTransform;
}

/**
* Get the consumerLimits configuration. May be null.
* @return the consumerLimits object
*/
public ConsumerLimits getConsumerLimits() {
return consumerLimits;
}

/**
* The mirror definition for this stream
* @return the mirror
Expand Down Expand Up @@ -526,6 +529,7 @@ public static class Builder {
private Placement placement = null;
private Republish republish = null;
private SubjectTransform subjectTransform = null;
private ConsumerLimits consumerLimits = null;
private Mirror mirror = null;
private final List<Source> sources = new ArrayList<>();
private boolean sealed = false;
Expand Down Expand Up @@ -569,6 +573,7 @@ public Builder(StreamConfiguration sc) {
this.placement = sc.placement;
this.republish = sc.republish;
this.subjectTransform = sc.subjectTransform;
this.consumerLimits = sc.consumerLimits;
this.mirror = sc.mirror;
sources(sc.sources);
this.sealed = sc.sealed;
Expand Down Expand Up @@ -847,6 +852,16 @@ public Builder subjectTransform(SubjectTransform subjectTransform) {
return this;
}

/**
* Sets the consumerLimits config object
* @param consumerLimits the consumerLimits config object
* @return Builder
*/
public Builder consumerLimits(ConsumerLimits consumerLimits) {
this.consumerLimits = consumerLimits;
return this;
}

/**
* Sets the mirror object
* @param mirror the mirror object
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public interface ApiConstants {
String CONNECT_URLS = "connect_urls";
String CONSUMER_COUNT = "consumer_count";
String CONSUMER_SEQ = "consumer_seq";
String CONSUMER_LIMITS = "consumer_limits";
String CONSUMERS = "consumers";
String CREATED = "created";
String CURRENT = "current";
Expand Down
45 changes: 44 additions & 1 deletion src/test/java/io/nats/client/api/StreamConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static io.nats.client.api.CompressionOption.None;
import static io.nats.client.api.CompressionOption.S2;
import static io.nats.client.api.ConsumerConfiguration.*;
import static org.junit.jupiter.api.Assertions.*;

public class StreamConfigurationTests extends JetStreamTestBase {
Expand Down Expand Up @@ -105,7 +106,8 @@ public void testConstruction() {
.denyPurge(testSc.getDenyPurge())
.discardNewPerSubject(testSc.isDiscardNewPerSubject())
.metadata(metaData)
.firstSequence(82942);
.firstSequence(82942)
.consumerLimits(testSc.getConsumerLimits());
validate(builder.build(), false);
validate(builder.addSources((Source)null).build(), false);

Expand Down Expand Up @@ -461,6 +463,10 @@ private void validate(StreamConfiguration sc, boolean serverTest) {
assertNotNull(sc.getSubjectTransform());
assertEquals("st.>", sc.getSubjectTransform().getSource());
assertEquals("stdest.>", sc.getSubjectTransform().getDestination());

assertNotNull(sc.getConsumerLimits());
assertEquals(Duration.ofSeconds(50), sc.getConsumerLimits().getInactiveThreshold());
assertEquals(42, sc.getConsumerLimits().getMaxAckPending());
}
}

Expand Down Expand Up @@ -520,6 +526,43 @@ public void testSubjectTransform() {
assertNull(st.getDestination());
}

@Test
public void testConsumerLimits() {
ConsumerLimits cl = ConsumerLimits.builder().build();
assertEquals(null, cl.getInactiveThreshold());
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(0)).build();
assertEquals(Duration.ZERO, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(0L).build();
assertEquals(Duration.ZERO, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(1)).build();
assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(1L).build();
assertEquals(Duration.ofMillis(1), cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(Duration.ofMillis(-1)).build();
assertEquals(DURATION_UNSET, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().inactiveThreshold(-1).build();
assertEquals(DURATION_UNSET, cl.getInactiveThreshold());

cl = ConsumerLimits.builder().maxAckPending(STANDARD_MIN).build();
assertEquals(STANDARD_MIN, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(INTEGER_UNSET).build();
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(-2).build();
assertEquals(INTEGER_UNSET, cl.getMaxAckPending());

cl = ConsumerLimits.builder().maxAckPending(Long.MAX_VALUE).build();
assertEquals(Integer.MAX_VALUE, cl.getMaxAckPending());
}

@Test
public void testExternal() {
External e = External.builder().api("api").deliver("deliver").build();
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/data/StreamConfiguration.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
"src": "st.>",
"dest": "stdest.>"
},
"consumer_limits": {
"inactive_threshold": 50000000000,
"max_ack_pending": 42
},
"mirror": {
"name": "eman",
"opt_start_seq": 736,
Expand Down