Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer Limits Stream Configuration #979

Merged
merged 6 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions src/main/java/io/nats/client/api/ConsumerLimits.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.JsonValue;

import java.time.Duration;

import static io.nats.client.api.ConsumerConfiguration.*;
import static io.nats.client.support.ApiConstants.INACTIVE_THRESHOLD;
import static io.nats.client.support.ApiConstants.MAX_ACK_PENDING;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readNanos;

/**
* ConsumerLimits
*/
public class ConsumerLimits implements JsonSerializable {
private final Duration inactiveThreshold;
private final Integer maxAckPending;

static ConsumerLimits optionalInstance(JsonValue vConsumerLimits) {
return vConsumerLimits == null ? null : new ConsumerLimits(vConsumerLimits);
}

ConsumerLimits(JsonValue vConsumerLimits) {
inactiveThreshold = readNanos(vConsumerLimits, INACTIVE_THRESHOLD);
maxAckPending = readInteger(vConsumerLimits, MAX_ACK_PENDING);
}

ConsumerLimits(ConsumerLimits.Builder b) {
this.inactiveThreshold = b.inactiveThreshold;
this.maxAckPending = b.maxAckPending;
}

/**
* Get the amount of time before the consumer is deemed inactive.
* @return the inactive threshold
*/
public Duration getInactiveThreshold() {
return inactiveThreshold;
}

/**
* Gets the maximum ack pending configuration.
* @return maximum ack pending.
*/
public long getMaxAckPending() {
return getOrUnset(maxAckPending);
}

public String toJson() {
StringBuilder sb = beginJson();
JsonUtils.addFieldAsNanos(sb, INACTIVE_THRESHOLD, inactiveThreshold);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
return endJson(sb).toString();
}

/**
* Creates a builder for a placements object.
* @return the builder.
*/
public static Builder builder() {
return new Builder();
}

/**
* Placement can be created using a Builder.
*/
public static class Builder {
private Duration inactiveThreshold;
private Integer maxAckPending;

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration
* @return Builder
*/
public Builder inactiveThreshold(Duration inactiveThreshold) {
this.inactiveThreshold = normalize(inactiveThreshold);
return this;
}

/**
* sets the amount of time before the consumer is deemed inactive.
* @param inactiveThreshold the threshold duration in milliseconds
* @return Builder
*/
public Builder inactiveThreshold(long inactiveThreshold) {
this.inactiveThreshold = normalizeDuration(inactiveThreshold);
return this;
}

/**
* Sets the maximum ack pending or null to unset / clear.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(Long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Sets the maximum ack pending.
* @param maxAckPending maximum pending acknowledgements.
* @return Builder
*/
public Builder maxAckPending(long maxAckPending) {
this.maxAckPending = normalize(maxAckPending, STANDARD_MIN);
return this;
}

/**
* Build a Placement object
* @return the Placement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Build a Placement object
* @return the Placement
* Build a ConsumerLimits object
* @return ConsumerLimits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah will fix, a lot of cut paste going on

*/
public ConsumerLimits build() {
return new ConsumerLimits(this);
}
}
}
10 changes: 5 additions & 5 deletions src/main/java/io/nats/client/api/Republish.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static io.nats.client.support.JsonValueUtils.readString;

/**
* Republish directives to consider
* Republish Configuration
*/
public class Republish implements JsonSerializable {
private final String source;
Expand All @@ -42,18 +42,20 @@ static Republish optionalInstance(JsonValue vRepublish) {

/**
* Construct a 'republish' object
* @param source the Published Subject-matching filter
* @param source the Published subject matching filter
* @param destination the RePublish Subject template
* @param headersOnly Whether to RePublish only headers (no body)
*/
public Republish(String source, String destination, boolean headersOnly) {
Validator.required(source, "Source");
Validator.required(destination, "Destination");
this.source = source;
this.destination = destination;
this.headersOnly = headersOnly;
}

/**
* Get source, the Published Subject-matching filter
* Get source, the Published subject matching filter
* @return the source
*/
public String getSource() {
Expand Down Expand Up @@ -134,8 +136,6 @@ public Builder headersOnly(Boolean headersOnly) {
* @return the Placement
*/
public Republish build() {
Validator.required(source, "Source");
Validator.required(destination, "Destination");
return new Republish(source, destination, headersOnly);
}
}
Expand Down
65 changes: 53 additions & 12 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class StreamConfiguration implements JsonSerializable {
private final Duration duplicateWindow;
private final Placement placement;
private final Republish republish;
private final SubjectTransform subjectTransform;
private final ConsumerLimits consumerLimits;
private final Mirror mirror;
private final List<Source> sources;
private final boolean sealed;
Expand Down Expand Up @@ -89,6 +91,8 @@ static StreamConfiguration instance(JsonValue v) {
builder.subjects(readStringList(v, SUBJECTS));
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)));
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
builder.sealed(readBoolean(v, SEALED));
Expand Down Expand Up @@ -124,6 +128,8 @@ static StreamConfiguration instance(JsonValue v) {
this.duplicateWindow = b.duplicateWindow;
this.placement = b.placement;
this.republish = b.republish;
this.subjectTransform = b.subjectTransform;
this.consumerLimits = b.consumerLimits;
this.mirror = b.mirror;
this.sources = b.sources;
this.sealed = b.sealed;
Expand Down Expand Up @@ -165,17 +171,12 @@ public String toJson() {
addField(sb, TEMPLATE_OWNER, templateOwner);
addField(sb, DISCARD, discardPolicy.toString());
addFieldAsNanos(sb, DUPLICATE_WINDOW, duplicateWindow);
if (placement != null) {
addField(sb, PLACEMENT, placement);
}
if (republish != null) {
addField(sb, REPUBLISH, republish);
}
if (mirror != null) {
addField(sb, MIRROR, mirror);
}
addField(sb, PLACEMENT, placement);
addField(sb, REPUBLISH, republish);
addField(sb, SUBJECT_TRANSFORM, subjectTransform);
addField(sb, CONSUMER_LIMITS, consumerLimits);
addField(sb, MIRROR, mirror);
addJsons(sb, SOURCES, sources);

addFldWhenTrue(sb, SEALED, sealed);
addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup);
addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect);
Expand Down Expand Up @@ -343,6 +344,22 @@ public Republish getRepublish() {
return republish;
}

/**
* Get the subjectTransform configuration. May be null.
* @return the subjectTransform object
*/
public SubjectTransform getSubjectTransform() {
return subjectTransform;
}

/**
* Get the consumerLimits configuration. May be null.
* @return the consumerLimits object
*/
public ConsumerLimits getConsumerLimits() {
return consumerLimits;
}

/**
* The mirror definition for this stream
* @return the mirror
Expand Down Expand Up @@ -511,6 +528,8 @@ public static class Builder {
private Duration duplicateWindow = Duration.ZERO;
private Placement placement = null;
private Republish republish = null;
private SubjectTransform subjectTransform = null;
private ConsumerLimits consumerLimits = null;
private Mirror mirror = null;
private final List<Source> sources = new ArrayList<>();
private boolean sealed = false;
Expand Down Expand Up @@ -553,6 +572,8 @@ public Builder(StreamConfiguration sc) {
this.duplicateWindow = sc.duplicateWindow;
this.placement = sc.placement;
this.republish = sc.republish;
this.subjectTransform = sc.subjectTransform;
this.consumerLimits = sc.consumerLimits;
this.mirror = sc.mirror;
sources(sc.sources);
this.sealed = sc.sealed;
Expand Down Expand Up @@ -812,15 +833,35 @@ public Builder placement(Placement placement) {
}

/**
* Sets the republish directive object
* @param republish the republish directive object
* Sets the republish config object
* @param republish the republish config object
* @return Builder
*/
public Builder republish(Republish republish) {
this.republish = republish;
return this;
}

/**
* Sets the subjectTransform config object
* @param subjectTransform the subjectTransform config object
* @return Builder
*/
public Builder subjectTransform(SubjectTransform subjectTransform) {
this.subjectTransform = subjectTransform;
return this;
}

/**
* Sets the consumerLimits config object
* @param consumerLimits the consumerLimits config object
* @return Builder
*/
public Builder consumerLimits(ConsumerLimits consumerLimits) {
this.consumerLimits = consumerLimits;
return this;
}

/**
* Sets the mirror object
* @param mirror the mirror object
Expand Down
Loading