Skip to content

Commit

Permalink
kv mirror support (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 8, 2022
1 parent 4f2e11a commit d6bd62a
Show file tree
Hide file tree
Showing 29 changed files with 664 additions and 90 deletions.
11 changes: 9 additions & 2 deletions src/main/java/io/nats/client/JetStreamOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.time.Duration;

import static io.nats.client.support.NatsConstants.DOT;
import static io.nats.client.support.NatsJetStreamConstants.*;
import static io.nats.client.support.Validator.ensureEndsWithDot;
import static io.nats.client.support.Validator.validatePrefixOrDomain;
Expand Down Expand Up @@ -172,8 +173,8 @@ public Builder prefix(String prefix) {
* @return the builder.
*/
public Builder domain(String domain) {
String valid = validatePrefixOrDomain(domain, "Prefix", false);
jsPrefix = valid == null ? null : PREFIX_DOLLAR_JS_DOT + ensureEndsWithDot(valid) + PREFIX_API_DOT;
String prefix = convertDomainToPrefix(domain);
jsPrefix = prefix == null ? null : prefix + DOT;
return this;
}

Expand Down Expand Up @@ -206,4 +207,10 @@ public JetStreamOptions build() {
return new JetStreamOptions(this);
}
}

public static String convertDomainToPrefix(String domain) {
String valid = validatePrefixOrDomain(domain, "Domain", false);
return valid == null ? null
: PREFIX_DOLLAR_JS_DOT + ensureEndsWithDot(valid) + PREFIX_API;
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class SubscribeOptions {
protected final long messageAlarmTime;
protected final ConsumerConfiguration consumerConfig;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get it's vars
@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected SubscribeOptions(Builder builder, boolean isPull, boolean isOrdered, String deliverSubject, String deliverGroup) {

pull = isPull;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/api/Error.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static Error optionalInstance(String json) {
this.json = json;
code = JsonUtils.readInt(json, CODE_RE, NOT_SET);
apiErrorCode = JsonUtils.readInt(json, ERR_CODE_RE, NOT_SET);
desc = JsonUtils.readString(json, DESCRIPTION_RE, "Unknown JetStream Error");
desc = JsonUtils.readStringMayHaveQuotes(json, DESCRIPTION, "Unknown JetStream Error");
}

Error(int code, int apiErrorCode, String desc) {
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/api/FeatureConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
public abstract class FeatureConfiguration {
protected final StreamConfiguration sc;
protected final String bucketName;



public FeatureConfiguration(StreamConfiguration sc, String bucketName) {
this.sc = sc;
this.bucketName = bucketName;
Expand Down
109 changes: 108 additions & 1 deletion src/main/java/io/nats/client/api/KeyValueConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import io.nats.client.support.NatsKeyValueUtil;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static io.nats.client.support.NatsKeyValueUtil.*;
import static io.nats.client.support.Validator.*;
Expand Down Expand Up @@ -107,6 +111,8 @@ public static class Builder {

String name;
StreamConfiguration.Builder scBuilder;
Mirror mirror;
List<Source> sources = new ArrayList<>();

/**
* Default Builder
Expand Down Expand Up @@ -231,18 +237,119 @@ public Builder republish(Republish republish) {
return this;
}

/**
* Sets the mirror in the KeyValueConfiguration.
* @param mirror the KeyValue's mirror
* @return Builder
*/
public Builder mirror(Mirror mirror) {
this.mirror = mirror;
return this;
}

/**
* Sets the sources in the KeyValueConfiguration.
* @param sources the KeyValue's sources
* @return Builder
*/
public Builder sources(Source... sources) {
this.sources.clear();
return addSources(sources);
}

/**
* Sets the sources in the KeyValueConfiguration
* @param sources the KeyValue's sources
* @return Builder
*/
public Builder sources(Collection<Source> sources) {
this.sources.clear();
return addSources(sources);
}

/**
* Add a source into the KeyValueConfiguration.
* @param source a KeyValue source
* @return Builder
*/
public Builder addSource(Source source) {
if (source != null && !this.sources.contains(source)) {
this.sources.add(source);
}
return this;
}

/**
* Adds the sources into the KeyValueConfiguration
* @param sources the KeyValue's sources to add
* @return Builder
*/
public Builder addSources(Source... sources) {
if (sources != null) {
return addSources(Arrays.asList(sources));
}
return this;
}

/**
* Adds the sources into the KeyValueConfiguration
* @param sources the KeyValue's sources to add
* @return Builder
*/
public Builder addSources(Collection<Source> sources) {
if (sources != null) {
for (Source source : sources) {
if (source != null && !this.sources.contains(source)) {
this.sources.add(source);
}
}
}
return this;
}

/**
* Builds the KeyValueConfiguration
* @return the KeyValueConfiguration.
*/
public KeyValueConfiguration build() {
name = validateBucketName(name, true);
scBuilder.name(toStreamName(name))
.subjects(toStreamSubject(name))
.allowRollup(true)
.allowDirect(true) // by design
.discardPolicy(DiscardPolicy.New)
.denyDelete(true);

if (mirror != null) {
scBuilder.mirrorDirect(true);
String name = mirror.getName();
if (hasPrefix(name)) {
scBuilder.mirror(mirror);
}
else {
scBuilder.mirror(
Mirror.builder(mirror)
.name(toStreamName(name))
.build());
}
}
else if (sources.size() > 0) {
for (Source source : sources) {
String name = source.getName();
if (hasPrefix(name)) {
scBuilder.addSource(source);
}
else {
scBuilder.addSource(
Source.builder(source)
.name(toStreamName(name))
.build());
}
}
}
else {
scBuilder.subjects(toStreamSubject(name));
}

return new KeyValueConfiguration(scBuilder.build());
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/api/KeyValueEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public String getValueAsString() {
}

public Long getValueAsLong() {
String svalue = value == null ? null : new String(value, StandardCharsets.US_ASCII);
return svalue == null ? null : Long.parseLong(svalue);
return value == null ? null : Long.parseLong(new String(value, StandardCharsets.US_ASCII));
}

public long getDataLen() {
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/nats/client/api/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import io.nats.client.support.JsonUtils;

import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.MIRROR;

/**
Expand All @@ -34,22 +32,32 @@ static Mirror optionalInstance(String fullJson) {
super(MIRROR, json);
}

Mirror(String name, long startSeq, ZonedDateTime startTime, String filterSubject, External external) {
super(MIRROR, name, startSeq, startTime, filterSubject, external);
Mirror(Builder b) {
super(MIRROR, b);
}

public static Builder builder() {
return new Builder();
}

public static Builder builder(Mirror mirror) {
return new Builder(mirror);
}

public static class Builder extends SourceBaseBuilder<Builder> {
@Override
Builder getThis() {
return this;
}

public Builder() {}

public Builder(Mirror mirror) {
super(mirror);
}

public Mirror build() {
return new Mirror(sourceName, startSeq, startTime, filterSubject, external);
return new Mirror(this);
}
}
}
17 changes: 13 additions & 4 deletions src/main/java/io/nats/client/api/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package io.nats.client.api;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -39,22 +38,32 @@ static List<Source> optionalListOf(String json) {
super(SOURCE, json);
}

Source(String name, long startSeq, ZonedDateTime startTime, String filterSubject, External external) {
super(SOURCE, name, startSeq, startTime, filterSubject, external);
Source(Builder b) {
super(SOURCE, b);
}

public static Builder builder() {
return new Builder();
}

public static Builder builder(Source source) {
return new Builder(source);
}

public static class Builder extends SourceBaseBuilder<Builder> {
@Override
Builder getThis() {
return this;
}

public Builder() {}

public Builder(Source source) {
super(source);
}

public Source build() {
return new Source(sourceName, startSeq, startTime, filterSubject, external);
return new Source(this);
}
}
}
Loading

0 comments on commit d6bd62a

Please sign in to comment.