From d6bd62a539c3088c7488bec24c6b571335494bc0 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Tue, 8 Nov 2022 14:49:23 -0500 Subject: [PATCH] kv mirror support (#789) --- .../java/io/nats/client/JetStreamOptions.java | 11 +- .../java/io/nats/client/SubscribeOptions.java | 2 +- src/main/java/io/nats/client/api/Error.java | 2 +- .../nats/client/api/FeatureConfiguration.java | 3 +- .../client/api/KeyValueConfiguration.java | 109 ++++++++++- .../io/nats/client/api/KeyValueEntry.java | 3 +- src/main/java/io/nats/client/api/Mirror.java | 18 +- src/main/java/io/nats/client/api/Source.java | 17 +- .../java/io/nats/client/api/SourceBase.java | 67 +++++-- .../nats/client/api/StreamConfiguration.java | 50 ++++- .../io/nats/client/impl/NatsKeyValue.java | 71 ++++--- .../client/impl/NatsKeyValueManagement.java | 3 + .../impl/NatsKeyValueWatchSubscription.java | 2 +- .../client/impl/NatsWatchSubscription.java | 18 +- .../io/nats/client/support/ApiConstants.java | 2 + .../io/nats/client/support/BuilderBase.java | 5 - .../io/nats/client/support/JsonUtils.java | 30 +++ .../support/NatsJetStreamConstants.java | 2 +- .../nats/client/support/NatsKeyValueUtil.java | 11 ++ .../io/nats/client/JetStreamOptionsTests.java | 2 +- .../client/api/StreamConfigurationTests.java | 20 ++ .../client/impl/JetStreamManagementTests.java | 6 + ...va => JetStreamMirrorAndSourcesTests.java} | 2 +- .../io/nats/client/impl/KeyValueTests.java | 178 ++++++++++++++++++ .../io/nats/client/impl/RequestTests.java | 5 +- .../client/support/ByteArrayBuilderTests.java | 52 ++++- .../nats/client/support/JsonUtilsTests.java | 18 ++ .../java/io/nats/client/utils/TestBase.java | 44 +++++ .../resources/data/StreamConfiguration.json | 1 + 29 files changed, 664 insertions(+), 90 deletions(-) rename src/test/java/io/nats/client/impl/{JetStreamMirrorTests.java => JetStreamMirrorAndSourcesTests.java} (99%) diff --git a/src/main/java/io/nats/client/JetStreamOptions.java b/src/main/java/io/nats/client/JetStreamOptions.java index 17254aff6..a34358080 100644 --- a/src/main/java/io/nats/client/JetStreamOptions.java +++ b/src/main/java/io/nats/client/JetStreamOptions.java @@ -15,6 +15,7 @@ import java.time.Duration; +import static io.nats.client.support.NatsConstants.DOT; import static io.nats.client.support.NatsJetStreamConstants.*; import static io.nats.client.support.Validator.ensureEndsWithDot; import static io.nats.client.support.Validator.validatePrefixOrDomain; @@ -172,8 +173,8 @@ public Builder prefix(String prefix) { * @return the builder. */ public Builder domain(String domain) { - String valid = validatePrefixOrDomain(domain, "Prefix", false); - jsPrefix = valid == null ? null : PREFIX_DOLLAR_JS_DOT + ensureEndsWithDot(valid) + PREFIX_API_DOT; + String prefix = convertDomainToPrefix(domain); + jsPrefix = prefix == null ? null : prefix + DOT; return this; } @@ -206,4 +207,10 @@ public JetStreamOptions build() { return new JetStreamOptions(this); } } + + public static String convertDomainToPrefix(String domain) { + String valid = validatePrefixOrDomain(domain, "Domain", false); + return valid == null ? null + : PREFIX_DOLLAR_JS_DOT + ensureEndsWithDot(valid) + PREFIX_API; + } } diff --git a/src/main/java/io/nats/client/SubscribeOptions.java b/src/main/java/io/nats/client/SubscribeOptions.java index d9a1b2b69..a93cf5c3a 100644 --- a/src/main/java/io/nats/client/SubscribeOptions.java +++ b/src/main/java/io/nats/client/SubscribeOptions.java @@ -35,7 +35,7 @@ public abstract class SubscribeOptions { protected final long messageAlarmTime; protected final ConsumerConfiguration consumerConfig; - @SuppressWarnings("rawtypes") // Don't need the type of the builder to get it's vars + @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars protected SubscribeOptions(Builder builder, boolean isPull, boolean isOrdered, String deliverSubject, String deliverGroup) { pull = isPull; diff --git a/src/main/java/io/nats/client/api/Error.java b/src/main/java/io/nats/client/api/Error.java index 79b76f57d..57655b4f9 100644 --- a/src/main/java/io/nats/client/api/Error.java +++ b/src/main/java/io/nats/client/api/Error.java @@ -39,7 +39,7 @@ static Error optionalInstance(String json) { this.json = json; code = JsonUtils.readInt(json, CODE_RE, NOT_SET); apiErrorCode = JsonUtils.readInt(json, ERR_CODE_RE, NOT_SET); - desc = JsonUtils.readString(json, DESCRIPTION_RE, "Unknown JetStream Error"); + desc = JsonUtils.readStringMayHaveQuotes(json, DESCRIPTION, "Unknown JetStream Error"); } Error(int code, int apiErrorCode, String desc) { diff --git a/src/main/java/io/nats/client/api/FeatureConfiguration.java b/src/main/java/io/nats/client/api/FeatureConfiguration.java index 1fa8e48ff..1ead2254b 100644 --- a/src/main/java/io/nats/client/api/FeatureConfiguration.java +++ b/src/main/java/io/nats/client/api/FeatureConfiguration.java @@ -18,8 +18,7 @@ public abstract class FeatureConfiguration { protected final StreamConfiguration sc; protected final String bucketName; - - + public FeatureConfiguration(StreamConfiguration sc, String bucketName) { this.sc = sc; this.bucketName = bucketName; diff --git a/src/main/java/io/nats/client/api/KeyValueConfiguration.java b/src/main/java/io/nats/client/api/KeyValueConfiguration.java index faa5f9ece..f069d624e 100644 --- a/src/main/java/io/nats/client/api/KeyValueConfiguration.java +++ b/src/main/java/io/nats/client/api/KeyValueConfiguration.java @@ -15,6 +15,10 @@ import io.nats.client.support.NatsKeyValueUtil; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import static io.nats.client.support.NatsKeyValueUtil.*; import static io.nats.client.support.Validator.*; @@ -107,6 +111,8 @@ public static class Builder { String name; StreamConfiguration.Builder scBuilder; + Mirror mirror; + List sources = new ArrayList<>(); /** * Default Builder @@ -231,6 +237,76 @@ public Builder republish(Republish republish) { return this; } + /** + * Sets the mirror in the KeyValueConfiguration. + * @param mirror the KeyValue's mirror + * @return Builder + */ + public Builder mirror(Mirror mirror) { + this.mirror = mirror; + return this; + } + + /** + * Sets the sources in the KeyValueConfiguration. + * @param sources the KeyValue's sources + * @return Builder + */ + public Builder sources(Source... sources) { + this.sources.clear(); + return addSources(sources); + } + + /** + * Sets the sources in the KeyValueConfiguration + * @param sources the KeyValue's sources + * @return Builder + */ + public Builder sources(Collection sources) { + this.sources.clear(); + return addSources(sources); + } + + /** + * Add a source into the KeyValueConfiguration. + * @param source a KeyValue source + * @return Builder + */ + public Builder addSource(Source source) { + if (source != null && !this.sources.contains(source)) { + this.sources.add(source); + } + return this; + } + + /** + * Adds the sources into the KeyValueConfiguration + * @param sources the KeyValue's sources to add + * @return Builder + */ + public Builder addSources(Source... sources) { + if (sources != null) { + return addSources(Arrays.asList(sources)); + } + return this; + } + + /** + * Adds the sources into the KeyValueConfiguration + * @param sources the KeyValue's sources to add + * @return Builder + */ + public Builder addSources(Collection sources) { + if (sources != null) { + for (Source source : sources) { + if (source != null && !this.sources.contains(source)) { + this.sources.add(source); + } + } + } + return this; + } + /** * Builds the KeyValueConfiguration * @return the KeyValueConfiguration. @@ -238,11 +314,42 @@ public Builder republish(Republish republish) { public KeyValueConfiguration build() { name = validateBucketName(name, true); scBuilder.name(toStreamName(name)) - .subjects(toStreamSubject(name)) .allowRollup(true) .allowDirect(true) // by design .discardPolicy(DiscardPolicy.New) .denyDelete(true); + + if (mirror != null) { + scBuilder.mirrorDirect(true); + String name = mirror.getName(); + if (hasPrefix(name)) { + scBuilder.mirror(mirror); + } + else { + scBuilder.mirror( + Mirror.builder(mirror) + .name(toStreamName(name)) + .build()); + } + } + else if (sources.size() > 0) { + for (Source source : sources) { + String name = source.getName(); + if (hasPrefix(name)) { + scBuilder.addSource(source); + } + else { + scBuilder.addSource( + Source.builder(source) + .name(toStreamName(name)) + .build()); + } + } + } + else { + scBuilder.subjects(toStreamSubject(name)); + } + return new KeyValueConfiguration(scBuilder.build()); } } diff --git a/src/main/java/io/nats/client/api/KeyValueEntry.java b/src/main/java/io/nats/client/api/KeyValueEntry.java index 6ad4ed605..c31170b1b 100644 --- a/src/main/java/io/nats/client/api/KeyValueEntry.java +++ b/src/main/java/io/nats/client/api/KeyValueEntry.java @@ -74,8 +74,7 @@ public String getValueAsString() { } public Long getValueAsLong() { - String svalue = value == null ? null : new String(value, StandardCharsets.US_ASCII); - return svalue == null ? null : Long.parseLong(svalue); + return value == null ? null : Long.parseLong(new String(value, StandardCharsets.US_ASCII)); } public long getDataLen() { diff --git a/src/main/java/io/nats/client/api/Mirror.java b/src/main/java/io/nats/client/api/Mirror.java index 5aa92df3a..9d5cd6bdf 100644 --- a/src/main/java/io/nats/client/api/Mirror.java +++ b/src/main/java/io/nats/client/api/Mirror.java @@ -15,8 +15,6 @@ import io.nats.client.support.JsonUtils; -import java.time.ZonedDateTime; - import static io.nats.client.support.ApiConstants.MIRROR; /** @@ -34,22 +32,32 @@ static Mirror optionalInstance(String fullJson) { super(MIRROR, json); } - Mirror(String name, long startSeq, ZonedDateTime startTime, String filterSubject, External external) { - super(MIRROR, name, startSeq, startTime, filterSubject, external); + Mirror(Builder b) { + super(MIRROR, b); } public static Builder builder() { return new Builder(); } + public static Builder builder(Mirror mirror) { + return new Builder(mirror); + } + public static class Builder extends SourceBaseBuilder { @Override Builder getThis() { return this; } + public Builder() {} + + public Builder(Mirror mirror) { + super(mirror); + } + public Mirror build() { - return new Mirror(sourceName, startSeq, startTime, filterSubject, external); + return new Mirror(this); } } } diff --git a/src/main/java/io/nats/client/api/Source.java b/src/main/java/io/nats/client/api/Source.java index b9c0e2c7b..f51c38127 100644 --- a/src/main/java/io/nats/client/api/Source.java +++ b/src/main/java/io/nats/client/api/Source.java @@ -13,7 +13,6 @@ package io.nats.client.api; -import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; @@ -39,22 +38,32 @@ static List optionalListOf(String json) { super(SOURCE, json); } - Source(String name, long startSeq, ZonedDateTime startTime, String filterSubject, External external) { - super(SOURCE, name, startSeq, startTime, filterSubject, external); + Source(Builder b) { + super(SOURCE, b); } public static Builder builder() { return new Builder(); } + public static Builder builder(Source source) { + return new Builder(source); + } + public static class Builder extends SourceBaseBuilder { @Override Builder getThis() { return this; } + public Builder() {} + + public Builder(Source source) { + super(source); + } + public Source build() { - return new Source(sourceName, startSeq, startTime, filterSubject, external); + return new Source(this); } } } diff --git a/src/main/java/io/nats/client/api/SourceBase.java b/src/main/java/io/nats/client/api/SourceBase.java index d5053ae0f..ac890efe2 100644 --- a/src/main/java/io/nats/client/api/SourceBase.java +++ b/src/main/java/io/nats/client/api/SourceBase.java @@ -18,11 +18,12 @@ import java.time.ZonedDateTime; +import static io.nats.client.JetStreamOptions.convertDomainToPrefix; import static io.nats.client.support.ApiConstants.*; import static io.nats.client.support.JsonUtils.*; abstract class SourceBase implements JsonSerializable { - private final String sourceName; + private final String name; private final long startSeq; private final ZonedDateTime startTime; private final String filterSubject; @@ -30,7 +31,7 @@ abstract class SourceBase implements JsonSerializable { private final String objectName; SourceBase(String objectName, String json) { - sourceName = JsonUtils.readString(json, NAME_RE); + name = JsonUtils.readString(json, NAME_RE); startSeq = JsonUtils.readLong(json, OPT_START_SEQ_RE, 0); startTime = JsonUtils.readDate(json, OPT_START_TIME_RE); filterSubject = JsonUtils.readString(json, FILTER_SUBJECT_RE); @@ -38,12 +39,13 @@ abstract class SourceBase implements JsonSerializable { this.objectName = normalize(objectName); } - SourceBase(String objectName, String sourceName, long startSeq, ZonedDateTime startTime, String filterSubject, External external) { - this.sourceName = sourceName; - this.startSeq = startSeq; - this.startTime = startTime; - this.filterSubject = filterSubject; - this.external = external; + @SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars + SourceBase(String objectName, SourceBaseBuilder b) { + this.name = b.name; + this.startSeq = b.startSeq; + this.startTime = b.startTime; + this.filterSubject = b.filterSubject; + this.external = b.external; this.objectName = normalize(objectName); } @@ -54,7 +56,7 @@ abstract class SourceBase implements JsonSerializable { */ public String toJson() { StringBuilder sb = beginJson(); - JsonUtils.addField(sb, NAME, sourceName); + JsonUtils.addField(sb, NAME, name); if (startSeq > 0) { JsonUtils.addField(sb, OPT_START_SEQ, startSeq); } @@ -64,8 +66,20 @@ public String toJson() { return endJson(sb).toString(); } + /** + * Get the name of the source. Same as getName() + * @return get the source name + */ public String getSourceName() { - return sourceName; + return name; + } + + /** + * Get the name of the source. Same as getSourceName() + * @return the source name + */ + public String getName() { + return name; } public long getStartSeq() { @@ -87,7 +101,7 @@ public External getExternal() { @Override public String toString() { return objectName + "{" + - "sourceName='" + sourceName + '\'' + + "name='" + name + '\'' + ", startSeq=" + startSeq + ", startTime=" + startTime + ", filterSubject='" + filterSubject + '\'' + @@ -96,7 +110,7 @@ public String toString() { } public abstract static class SourceBaseBuilder { - String sourceName; + String name; long startSeq; ZonedDateTime startTime; String filterSubject; @@ -104,8 +118,23 @@ public abstract static class SourceBaseBuilder { abstract T getThis(); - public T sourceName(String sourceName) { - this.sourceName = sourceName; + public SourceBaseBuilder() {} + + public SourceBaseBuilder(SourceBase base) { + this.name = base.name; + this.startSeq = base.startSeq; + this.startTime = base.startTime; + this.filterSubject = base.filterSubject; + this.external = base.external; + } + + public T sourceName(String name) { + this.name = name; + return getThis(); + } + + public T name(String name) { + this.name = name; return getThis(); } @@ -128,6 +157,12 @@ public T external(External external) { this.external = external; return getThis(); } + + public T domain(String domain) { + String prefix = convertDomainToPrefix(domain); + external = prefix == null ? null : External.builder().api(prefix).build(); + return getThis(); + } } @Override @@ -138,7 +173,7 @@ public boolean equals(Object o) { SourceBase that = (SourceBase) o; if (startSeq != that.startSeq) return false; - if (sourceName != null ? !sourceName.equals(that.sourceName) : that.sourceName != null) return false; + if (name != null ? !name.equals(that.name) : that.name != null) return false; if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false; if (filterSubject != null ? !filterSubject.equals(that.filterSubject) : that.filterSubject != null) return false; @@ -148,7 +183,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = sourceName != null ? sourceName.hashCode() : 0; + int result = name != null ? name.hashCode() : 0; result = 31 * result + (int) (startSeq ^ (startSeq >>> 32)); result = 31 * result + (startTime != null ? startTime.hashCode() : 0); result = 31 * result + (filterSubject != null ? filterSubject.hashCode() : 0); diff --git a/src/main/java/io/nats/client/api/StreamConfiguration.java b/src/main/java/io/nats/client/api/StreamConfiguration.java index d3e415218..a98a415d0 100644 --- a/src/main/java/io/nats/client/api/StreamConfiguration.java +++ b/src/main/java/io/nats/client/api/StreamConfiguration.java @@ -57,6 +57,7 @@ public class StreamConfiguration implements JsonSerializable { private final boolean sealed; private final boolean allowRollup; private final boolean allowDirect; + private final boolean mirrorDirect; private final boolean denyDelete; private final boolean denyPurge; private final boolean discardNewPerSubject; @@ -100,6 +101,7 @@ static StreamConfiguration instance(String json) { builder.sealed(readBoolean(json, SEALED_RE)); builder.allowRollup(readBoolean(json, ALLOW_ROLLUP_HDRS_RE)); builder.allowDirect(readBoolean(json, ALLOW_DIRECT_RE)); + builder.mirrorDirect(readBoolean(json, MIRROR_DIRECT_RE)); builder.denyDelete(readBoolean(json, DENY_DELETE_RE)); builder.denyPurge(readBoolean(json, DENY_PURGE_RE)); builder.discardNewPerSubject(readBoolean(json, DISCARD_NEW_PER_SUBJECT_RE)); @@ -132,6 +134,7 @@ static StreamConfiguration instance(String json) { this.sealed = b.sealed; this.allowRollup = b.allowRollup; this.allowDirect = b.allowDirect; + this.mirrorDirect = b.mirrorDirect; this.denyDelete = b.denyDelete; this.denyPurge = b.denyPurge; this.discardNewPerSubject = b.discardNewPerSubject; @@ -176,6 +179,7 @@ public String toJson() { addFldWhenTrue(sb, SEALED, sealed); addFldWhenTrue(sb, ALLOW_ROLLUP_HDRS, allowRollup); addFldWhenTrue(sb, ALLOW_DIRECT, allowDirect); + addFldWhenTrue(sb, MIRROR_DIRECT, mirrorDirect); addFldWhenTrue(sb, DENY_DELETE, denyDelete); addFldWhenTrue(sb, DENY_PURGE, denyPurge); addFldWhenTrue(sb, DISCARD_NEW_PER_SUBJECT, discardNewPerSubject); @@ -369,6 +373,15 @@ public boolean getAllowDirect() { return allowDirect; } + /** + * Get the flag indicating if the stream allows + * higher performance and unified direct access for mirrors as well. + * @return the allows direct flag + */ + public boolean getMirrorDirect() { + return mirrorDirect; + } + /** * Get the flag indicating if deny delete is set for the stream * @return the deny delete flag @@ -414,6 +427,7 @@ public String toString() { ", duplicateWindow=" + duplicateWindow + ", allowRollup=" + allowRollup + ", allowDirect=" + allowDirect + + ", mirrorDirect=" + mirrorDirect + ", denyDelete=" + denyDelete + ", denyPurge=" + denyPurge + ", discardNewPerSubject=" + discardNewPerSubject + @@ -472,6 +486,7 @@ public static class Builder { private boolean sealed = false; private boolean allowRollup = false; private boolean allowDirect = false; + private boolean mirrorDirect = false; private boolean denyDelete = false; private boolean denyPurge = false; private boolean discardNewPerSubject = false; @@ -510,6 +525,7 @@ public Builder(StreamConfiguration sc) { this.sealed = sc.sealed; this.allowRollup = sc.allowRollup; this.allowDirect = sc.allowDirect; + this.mirrorDirect = sc.mirrorDirect; this.denyDelete = sc.denyDelete; this.denyPurge = sc.denyPurge; this.discardNewPerSubject = sc.discardNewPerSubject; @@ -557,8 +573,8 @@ public Builder subjects(Collection subjects) { } /** - * Sets the subjects in the StreamConfiguration. - * @param subjects the stream's subjects + * Adds unique subjects into the StreamConfiguration. + * @param subjects the stream's subjects to add * @return Builder */ public Builder addSubjects(String... subjects) { @@ -569,8 +585,8 @@ public Builder addSubjects(String... subjects) { } /** - * Sets the subjects in the StreamConfiguration. - * @param subjects the stream's subjects + * Adds unique subjects into the StreamConfiguration. + * @param subjects the stream's subjects to add * @return Builder */ public Builder addSubjects(Collection subjects) { @@ -778,7 +794,7 @@ public Builder sources(Source... sources) { } /** - * Sets the sources in the StreamConfiguration. + * Add the sources into the StreamConfiguration. * @param sources the stream's sources * @return Builder */ @@ -788,7 +804,7 @@ public Builder sources(Collection sources) { } /** - * Sets the sources in the StreamConfiguration. + * Add the sources into the StreamConfiguration. * @param sources the stream's sources * @return Builder */ @@ -812,6 +828,18 @@ public Builder addSources(Collection sources) { return this; } + /** + * Add a source into the StreamConfiguration. + * @param source a stream source + * @return Builder + */ + public Builder addSource(Source source) { + if (source != null && !this.sources.contains(source)) { + this.sources.add(source); + } + return this; + } + /** * Set whether to seal the stream. * INTERNAL USE ONLY. Scoped protected for test purposes. @@ -843,6 +871,16 @@ public Builder allowDirect(boolean allowDirect) { return this; } + /** + * Set whether to allow unified direct access for mirrors + * @param mirrorDirect the allow direct setting + * @return Builder + */ + public Builder mirrorDirect(boolean mirrorDirect) { + this.mirrorDirect = mirrorDirect; + return this; + } + /** * Set whether to deny deleting messages from the stream * @param denyDelete the deny delete setting diff --git a/src/main/java/io/nats/client/impl/NatsKeyValue.java b/src/main/java/io/nats/client/impl/NatsKeyValue.java index febc24f93..dad5e1fff 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValue.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValue.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; +import static io.nats.client.support.NatsConstants.DOT; import static io.nats.client.support.NatsJetStreamConstants.EXPECTED_LAST_SUB_SEQ_HDR; import static io.nats.client.support.NatsJetStreamConstants.JS_WRONG_LAST_SEQUENCE; import static io.nats.client.support.NatsKeyValueUtil.*; @@ -36,32 +37,54 @@ public class NatsKeyValue extends NatsFeatureBase implements KeyValue { private final String bucketName; private final String streamSubject; - private final String rawKeyPrefix; - private final String pubSubKeyPrefix; + private final String readPrefix; + private final String writePrefix; NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException { super(connection, kvo); this.bucketName = Validator.validateBucketName(bucketName, true); streamName = toStreamName(bucketName); + StreamInfo si; + try { + si = jsm.getStreamInfo(streamName); + } catch (JetStreamApiException e) { + // can't throw directly, that would be a breaking change + throw new IOException(e); + } + streamSubject = toStreamSubject(bucketName); - rawKeyPrefix = toKeyPrefix(bucketName); - if (kvo == null) { - pubSubKeyPrefix = rawKeyPrefix; + String readTemp = toKeyPrefix(bucketName); + + String writeTemp; + Mirror m = si.getConfiguration().getMirror(); + if (m != null) { + String bName = trimPrefix(m.getName()); + String mExtApi = m.getExternal() == null ? null : m.getExternal().getApi(); + if (mExtApi == null) { + writeTemp = toKeyPrefix(bName); + } + else { + readTemp = toKeyPrefix(bName); + writeTemp = mExtApi + DOT + toKeyPrefix(bName); + } } - else if (kvo.getJetStreamOptions().isDefaultPrefix()) { - pubSubKeyPrefix = rawKeyPrefix; + else if (kvo == null || kvo.getJetStreamOptions().isDefaultPrefix()) { + writeTemp = readTemp; } else { - pubSubKeyPrefix = kvo.getJetStreamOptions().getPrefix() + rawKeyPrefix; + writeTemp = kvo.getJetStreamOptions().getPrefix() + readTemp; } + + readPrefix = readTemp; + writePrefix = writeTemp; } - String rawKeySubject(String key) { - return rawKeyPrefix + key; + String readSubject(String key) { + return readPrefix + key; } - String pubSubKeySubject(String key) { - return pubSubKeyPrefix + key; + String writeSubject(String key) { + return writePrefix + key; } /** @@ -93,7 +116,7 @@ KeyValueEntry existingOnly(KeyValueEntry kve) { } KeyValueEntry _get(String key) throws IOException, JetStreamApiException { - MessageInfo mi = _getLast(rawKeySubject(key)); + MessageInfo mi = _getLast(readSubject(key)); return mi == null ? null : new KeyValueEntry(mi); } @@ -113,7 +136,7 @@ KeyValueEntry _get(String key, long revision) throws IOException, JetStreamApiEx */ @Override public long put(String key, byte[] value) throws IOException, JetStreamApiException { - return _publishWithNonWildcardKey(key, value, null).getSeqno(); + return _write(key, value, null).getSeqno(); } /** @@ -121,7 +144,7 @@ public long put(String key, byte[] value) throws IOException, JetStreamApiExcept */ @Override public long put(String key, String value) throws IOException, JetStreamApiException { - return put(key, value.getBytes(StandardCharsets.UTF_8)); + return _write(key, value.getBytes(StandardCharsets.UTF_8), null).getSeqno(); } /** @@ -160,7 +183,7 @@ public long create(String key, byte[] value) throws IOException, JetStreamApiExc public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException { validateNonWildcardKvKeyRequired(key); Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); - return _publishWithNonWildcardKey(key, value, h).getSeqno(); + return _write(key, value, h).getSeqno(); } /** @@ -169,7 +192,7 @@ public long update(String key, byte[] value, long expectedRevision) throws IOExc @Override public void delete(String key) throws IOException, JetStreamApiException { validateNonWildcardKvKeyRequired(key); - _publishWithNonWildcardKey(key, null, getDeleteHeaders()); + _write(key, null, getDeleteHeaders()); } /** @@ -177,12 +200,12 @@ public void delete(String key) throws IOException, JetStreamApiException { */ @Override public void purge(String key) throws IOException, JetStreamApiException { - _publishWithNonWildcardKey(key, null, getPurgeHeaders()); + _write(key, null, getPurgeHeaders()); } - private PublishAck _publishWithNonWildcardKey(String key, byte[] data, Headers h) throws IOException, JetStreamApiException { + private PublishAck _write(String key, byte[] data, Headers h) throws IOException, JetStreamApiException { validateNonWildcardKvKeyRequired(key); - return js.publish(NatsMessage.builder().subject(pubSubKeySubject(key)).data(data).headers(h).build()); + return js.publish(NatsMessage.builder().subject(writeSubject(key)).data(data).headers(h).build()); } @Override @@ -204,7 +227,7 @@ public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueW @Override public List keys() throws IOException, JetStreamApiException, InterruptedException { List list = new ArrayList<>(); - visitSubject(rawKeySubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> { + visitSubject(readSubject(">"), DeliverPolicy.LastPerSubject, true, false, m -> { KeyValueOperation op = getOperation(m.getHeaders()); if (op == KeyValueOperation.PUT) { list.add(new BucketAndKey(m).key); @@ -220,7 +243,7 @@ public List keys() throws IOException, JetStreamApiException, Interrupte public List history(String key) throws IOException, JetStreamApiException, InterruptedException { validateNonWildcardKvKeyRequired(key); List list = new ArrayList<>(); - visitSubject(rawKeySubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m))); + visitSubject(readSubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m))); return list; } @@ -267,12 +290,12 @@ else if (dmThresh == 0) { }); for (String key : keep0List) { - jsm.purgeStream(streamName, PurgeOptions.subject(rawKeySubject(key))); + jsm.purgeStream(streamName, PurgeOptions.subject(readSubject(key))); } for (String key : keep1List) { PurgeOptions po = PurgeOptions.builder() - .subject(rawKeySubject(key)) + .subject(readSubject(key)) .keep(1) .build(); jsm.purgeStream(streamName, po); diff --git a/src/main/java/io/nats/client/impl/NatsKeyValueManagement.java b/src/main/java/io/nats/client/impl/NatsKeyValueManagement.java index 7c8d7cea0..75a492540 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValueManagement.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValueManagement.java @@ -42,6 +42,9 @@ public class NatsKeyValueManagement implements KeyValueManagement { @Override public KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException { StreamConfiguration sc = config.getBackingConfig(); + + // most validation / KVC setup is done in the KeyValueConfiguration Builder + // but this is done here because the context has a connection which has the server info with a version if ( serverOlderThan272 ) { sc = StreamConfiguration.builder(sc).discardPolicy(null).build(); // null discard policy will use default } diff --git a/src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java b/src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java index f675fdb4b..51373ae2a 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java @@ -54,6 +54,6 @@ public void onMessage(Message m) throws InterruptedException { } }; - finishInit(kv, kv.rawKeySubject(keyPattern), deliverPolicy, headersOnly, handler); + finishInit(kv, kv.readSubject(keyPattern), deliverPolicy, headersOnly, handler); } } diff --git a/src/main/java/io/nats/client/impl/NatsWatchSubscription.java b/src/main/java/io/nats/client/impl/NatsWatchSubscription.java index 469d14c48..2287884a1 100644 --- a/src/main/java/io/nats/client/impl/NatsWatchSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsWatchSubscription.java @@ -22,10 +22,8 @@ import java.io.IOException; public class NatsWatchSubscription implements AutoCloseable { - private static final Object dispatcherLock = new Object(); - private static NatsDispatcher dispatcher; - private final JetStream js; + private NatsDispatcher dispatcher; private JetStreamSubscription sub; public NatsWatchSubscription(JetStream js) { @@ -53,7 +51,8 @@ protected void finishInit(NatsFeatureBase fb, String subscribeSubject, DeliverPo .build()) .build(); - sub = js.subscribe(subscribeSubject, getDispatcher(js), handler, false, pso); + dispatcher = (NatsDispatcher) ((NatsJetStream) js).conn.createDispatcher(); + sub = js.subscribe(subscribeSubject, dispatcher, handler, false, pso); if (!handler.endOfDataSent) { long pending = sub.getConsumerInfo().getCalculatedPending(); if (pending == 0) { @@ -76,17 +75,8 @@ public void sendEndOfData() { } } - private static Dispatcher getDispatcher(JetStream js) { - synchronized (dispatcherLock) { - if (dispatcher == null) { - dispatcher = (NatsDispatcher) ((NatsJetStream) js).conn.createDispatcher(); - } - return dispatcher; - } - } - public void unsubscribe() { - synchronized (dispatcherLock) { + if (dispatcher != null) { dispatcher.unsubscribe(sub); if (dispatcher.getSubscriptionHandlers().size() == 0) { dispatcher.connection.closeDispatcher(dispatcher); diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 5da72a56b..766751609 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -25,6 +25,7 @@ public interface ApiConstants { String ACTIVE = "active"; String ALLOW_ROLLUP_HDRS = "allow_rollup_hdrs"; String ALLOW_DIRECT = "allow_direct"; + String MIRROR_DIRECT = "mirror_direct"; String API = "api"; String AUTH_REQUIRED = "auth_required"; String BACKOFF = "backoff"; @@ -253,6 +254,7 @@ public interface ApiConstants { Pattern MEMORY_RE = integer_pattern(MEMORY); Pattern MEMORY_MAX_STREAM_BYTES_RE= integer_pattern(MEMORY_MAX_STREAM_BYTES); Pattern MESSAGES_RE = integer_pattern(MESSAGES); + Pattern MIRROR_DIRECT_RE = boolean_pattern(MIRROR_DIRECT); Pattern MTIME_RE = string_pattern(MTIME); Pattern NAME_RE = string_pattern(NAME); Pattern NO_ACK_RE = boolean_pattern(NO_ACK); diff --git a/src/main/java/io/nats/client/support/BuilderBase.java b/src/main/java/io/nats/client/support/BuilderBase.java index 3819eee4d..6d99360ee 100644 --- a/src/main/java/io/nats/client/support/BuilderBase.java +++ b/src/main/java/io/nats/client/support/BuilderBase.java @@ -26,11 +26,6 @@ public abstract class BuilderBase { public static final int DEFAULT_OTHER_ALLOCATION = 64; public static final byte[] NULL = "null".getBytes(US_ASCII); - protected BuilderBase(Charset defaultCharset) { - this.defaultCharset = defaultCharset; - this.allocationSize = _defaultCharsetAllocationSize(); - } - protected BuilderBase(Charset defaultCharset, int allocationSize) { this.defaultCharset = defaultCharset; _setAllocationSize(allocationSize); diff --git a/src/main/java/io/nats/client/support/JsonUtils.java b/src/main/java/io/nats/client/support/JsonUtils.java index 1cbd1a283..d6f0a5249 100644 --- a/src/main/java/io/nats/client/support/JsonUtils.java +++ b/src/main/java/io/nats/client/support/JsonUtils.java @@ -628,6 +628,36 @@ public static String readString(String json, Pattern pattern, String dflt) { return m.find() ? jsonDecode(m.group(1)) : dflt; } + public static String readStringMayHaveQuotes(String json, String field, String dflt) { + String jfield = "\"" + field + "\""; + int at = json.indexOf(jfield); + if (at != -1) { + at = json.indexOf('"', at + jfield.length()); + StringBuilder sb = new StringBuilder(); + while (true) { + char c = json.charAt(++at); + if (c == '\\') { + char c2 = json.charAt(++at); + if (c2 == '"') { + sb.append('"'); + } + else { + sb.append(c); + sb.append(c2); + } + } + else if (c == '"') { + break; + } + else { + sb.append(c); + } + } + return jsonDecode(sb.toString()); + } + return dflt; + } + public static byte[] readBytes(String json, Pattern pattern) { String s = readString(json, pattern, null); return s == null ? null : s.getBytes(StandardCharsets.US_ASCII); diff --git a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java index 5409db3ec..9fac2ea5f 100644 --- a/src/main/java/io/nats/client/support/NatsJetStreamConstants.java +++ b/src/main/java/io/nats/client/support/NatsJetStreamConstants.java @@ -13,7 +13,7 @@ public interface NatsJetStreamConstants { int MAX_HISTORY_PER_KEY = 64; String PREFIX_DOLLAR_JS_DOT = "$JS."; - String PREFIX_API_DOT = "API."; + String PREFIX_API = "API"; String DEFAULT_API_PREFIX = "$JS.API."; String JS_ACK_SUBJECT_PREFIX = "$JS.ACK."; diff --git a/src/main/java/io/nats/client/support/NatsKeyValueUtil.java b/src/main/java/io/nats/client/support/NatsKeyValueUtil.java index 6ef4a61f7..7c63ddc71 100644 --- a/src/main/java/io/nats/client/support/NatsKeyValueUtil.java +++ b/src/main/java/io/nats/client/support/NatsKeyValueUtil.java @@ -47,6 +47,17 @@ public static String toKeyPrefix(String bucketName) { return KV_SUBJECT_PREFIX + bucketName + DOT; } + public static boolean hasPrefix(String bucketName) { + return bucketName.startsWith(KV_STREAM_PREFIX); + } + + public static String trimPrefix(String bucketName) { + if (bucketName.startsWith(KV_STREAM_PREFIX)) { + return bucketName.substring(KV_STREAM_PREFIX.length()); + } + return bucketName; + } + public static String getOperationHeader(Headers h) { return h == null ? null : h.getFirst(KV_OPERATION_HEADER_KEY); } diff --git a/src/test/java/io/nats/client/JetStreamOptionsTests.java b/src/test/java/io/nats/client/JetStreamOptionsTests.java index 7e5a40cb2..228002579 100644 --- a/src/test/java/io/nats/client/JetStreamOptionsTests.java +++ b/src/test/java/io/nats/client/JetStreamOptionsTests.java @@ -157,7 +157,7 @@ public void testDomainValidation() { private void assertValidDomain(String domain) { JetStreamOptions jso = JetStreamOptions.builder().domain(domain).build(); String prefixWithDot = domain.endsWith(DOT) ? domain : domain + DOT; - String expected = PREFIX_DOLLAR_JS_DOT + prefixWithDot + PREFIX_API_DOT; + String expected = PREFIX_DOLLAR_JS_DOT + prefixWithDot + PREFIX_API + DOT; assertEquals(expected, jso.getPrefix()); } diff --git a/src/test/java/io/nats/client/api/StreamConfigurationTests.java b/src/test/java/io/nats/client/api/StreamConfigurationTests.java index 9b34bf142..b332dfe83 100644 --- a/src/test/java/io/nats/client/api/StreamConfigurationTests.java +++ b/src/test/java/io/nats/client/api/StreamConfigurationTests.java @@ -47,6 +47,7 @@ public void testRoundTrip() throws Exception { .templateOwner(null) .allowRollup(false) .allowDirect(false) + .mirrorDirect(false) .sealed(false) .build(); JetStreamManagement jsm = nc.jetStreamManagement(); @@ -92,6 +93,7 @@ public void testConstruction() { .sealed(testSc.getSealed()) .allowRollup(testSc.getAllowRollup()) .allowDirect(testSc.getAllowDirect()) + .mirrorDirect(testSc.getMirrorDirect()) .denyDelete(testSc.getDenyDelete()) .denyPurge(testSc.getDenyPurge()) .discardNewPerSubject(testSc.isDiscardNewPerSubject()); @@ -104,6 +106,16 @@ public void testConstruction() { sources.add(copy); validate(builder.addSources(sources).build(), false); + // covering add a single source + sources = new ArrayList<>(testSc.getSources()); + builder.sources(new ArrayList<>()); // clears the sources + builder.addSource(null); // coverage + for (Source source : sources) { + builder.addSource(source); + } + builder.addSource(sources.get(0)); + validate(builder.build(), false); + // equals and hashcode coverage External external = copy.getExternal(); @@ -117,6 +129,10 @@ public void testConstruction() { if (l1.startsWith("{")) { Mirror m1 = new Mirror(l1); assertEquals(m1, m1); + assertEquals(m1, Mirror.builder(m1).build()); + Source s1 = new Source(l1); + assertEquals(s1, s1); + assertEquals(s1, Source.builder(s1).build()); //this provides testing coverage //noinspection ConstantConditions,SimplifiableAssertion assertTrue(!m1.equals(null)); @@ -124,11 +140,14 @@ public void testConstruction() { for (String l2 : lines) { if (l2.startsWith("{")) { Mirror m2 = new Mirror(l2); + Source s2 = new Source(l2); if (l1.equals(l2)) { assertEquals(m1, m2); + assertEquals(s1, s2); } else { assertNotEquals(m1, m2); + assertNotEquals(s1, s2); } } } @@ -363,6 +382,7 @@ private void validate(StreamConfiguration sc, boolean serverTest) { assertTrue(sc.isDiscardNewPerSubject()); assertTrue(sc.getAllowRollup()); assertTrue(sc.getAllowDirect()); + assertTrue(sc.getMirrorDirect()); assertEquals(5, sc.getReplicas()); assertEquals("twnr", sc.getTemplateOwner()); diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 27dbe2265..9b614c913 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -194,6 +194,12 @@ public void testUpdateStream() throws Exception { jsm.addStream(getTestStreamConfigurationBuilder().allowDirect(false).build()); jsm.updateStream(getTestStreamConfigurationBuilder().allowDirect(true).build()); jsm.updateStream(getTestStreamConfigurationBuilder().allowDirect(false).build()); + + // allowed to change Mirror Direct + jsm.deleteStream(STREAM); + jsm.addStream(getTestStreamConfigurationBuilder().mirrorDirect(false).build()); + jsm.updateStream(getTestStreamConfigurationBuilder().mirrorDirect(true).build()); + jsm.updateStream(getTestStreamConfigurationBuilder().mirrorDirect(false).build()); }); } diff --git a/src/test/java/io/nats/client/impl/JetStreamMirrorTests.java b/src/test/java/io/nats/client/impl/JetStreamMirrorAndSourcesTests.java similarity index 99% rename from src/test/java/io/nats/client/impl/JetStreamMirrorTests.java rename to src/test/java/io/nats/client/impl/JetStreamMirrorAndSourcesTests.java index f0f2bab98..bea1e2391 100644 --- a/src/test/java/io/nats/client/impl/JetStreamMirrorTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamMirrorAndSourcesTests.java @@ -24,7 +24,7 @@ import static org.junit.jupiter.api.Assertions.*; -public class JetStreamMirrorTests extends JetStreamTestBase { +public class JetStreamMirrorAndSourcesTests extends JetStreamTestBase { static final String S1 = stream(1); static final String S2 = stream(2); static final String S3 = stream(3); diff --git a/src/test/java/io/nats/client/impl/KeyValueTests.java b/src/test/java/io/nats/client/impl/KeyValueTests.java index d7c810779..85f0107c4 100644 --- a/src/test/java/io/nats/client/impl/KeyValueTests.java +++ b/src/test/java/io/nats/client/impl/KeyValueTests.java @@ -23,6 +23,7 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import static io.nats.client.JetStreamOptions.DEFAULT_JS_OPTIONS; @@ -1093,6 +1094,7 @@ private void assertKveAccountGet(KeyValue kvUserA, KeyValue kvUserI, String key, assertEquals(KeyValueOperation.PUT, kveUserA.getOperation()); } + @SuppressWarnings({"SimplifiableAssertion", "ConstantConditions"}) @Test public void testCoverBucketAndKey() { NatsKeyValueUtil.BucketAndKey bak1 = new NatsKeyValueUtil.BucketAndKey(DOT + BUCKET + DOT + KEY); @@ -1114,6 +1116,15 @@ public void testCoverBucketAndKey() { assertFalse(bak4.equals(new Object())); } + @Test + public void testCoverPrefix() { + assertTrue(NatsKeyValueUtil.hasPrefix("KV_has")); + assertFalse(NatsKeyValueUtil.hasPrefix("doesn't")); + assertEquals("has", NatsKeyValueUtil.trimPrefix("KV_has")); + assertEquals("doesn't", NatsKeyValueUtil.trimPrefix("doesn't")); + + } + @Test public void testKeyValueEntryEqualsImpl() throws Exception { runInJsServer(nc -> { @@ -1217,4 +1228,171 @@ public void testKeyValuePurgeOptionsBuilderCoverage() { KeyValuePurgeOptions.builder().deleteMarkersNoThreshold().build() .getDeleteMarkersThresholdMillis()); } + + @Test + public void testCreateDiscardPolicy() throws Exception { + runInJsServer(nc -> { + KeyValueManagement kvm = nc.keyValueManagement(); + + // create bucket + KeyValueStatus status = kvm.create(KeyValueConfiguration.builder() + .name(bucket(1)) + .storageType(StorageType.Memory) + .build()); + + DiscardPolicy dp = status.getConfiguration().getBackingConfig().getDiscardPolicy(); + if (nc.getServerInfo().isSameOrNewerThanVersion("2.7.2")) { + assertEquals(DiscardPolicy.New, dp); + } + else { + assertTrue(dp == DiscardPolicy.New || dp == DiscardPolicy.Old); + } + }); + } + + @Test + public void testMiscCoverage() throws Exception { + runInJsServer(nc -> { + KeyValueManagement kvm = nc.keyValueManagement(); + + // create bucket + kvm.create(KeyValueConfiguration.builder() + .name(BUCKET) + .storageType(StorageType.Memory) + .build()); + + KeyValue kv = nc.keyValue(BUCKET); + kv.put("a", "a"); + KeyValueEntry kve = kv.get("a"); + assertThrows(NumberFormatException.class, kve::getValueAsLong); + + kv.delete("a"); + List list = kv.history("a"); + assertNull(list.get(0).getValueAsString()); + assertNull(list.get(0).getValueAsLong()); + }); + } + + @Test + public void testMirrorSoureceBuilderPrefixConversion() throws Exception { + KeyValueConfiguration kvc = KeyValueConfiguration.builder() + .name(BUCKET) + .mirror(Mirror.builder().name("name").build()) + .build(); + assertEquals("KV_name", kvc.getBackingConfig().getMirror().getName()); + + kvc = KeyValueConfiguration.builder() + .name(BUCKET) + .mirror(Mirror.builder().name("KV_name").build()) + .build(); + assertEquals("KV_name", kvc.getBackingConfig().getMirror().getName()); + + Source s1 = Source.builder().name("s1").build(); + Source s2 = Source.builder().name("s2").build(); + Source s3 = Source.builder().name("s3").build(); + Source s4 = Source.builder().name("s4").build(); + Source s5 = Source.builder().name("KV_s5").build(); + Source s6 = Source.builder().name("KV_s6").build(); + + kvc = KeyValueConfiguration.builder() + .name(BUCKET) + .sources(s3, s4) + .sources(Arrays.asList(s1, s2)) + .addSources(s1, s2) + .addSources(Arrays.asList(s1, s2, null)) + .addSources(s3, s4) + .addSource(null) + .addSource(s5) + .addSource(s5) + .addSources(s6) + .addSources((Source[])null) + .addSources((Collection)null) + .build(); + + assertEquals(6, kvc.getBackingConfig().getSources().size()); + List names = new ArrayList<>(); + for (Source source : kvc.getBackingConfig().getSources()) { + names.add(source.getName()); + } + assertTrue(names.contains("KV_s1")); + assertTrue(names.contains("KV_s2")); + assertTrue(names.contains("KV_s3")); + assertTrue(names.contains("KV_s4")); + assertTrue(names.contains("KV_s5")); + assertTrue(names.contains("KV_s6")); + } + + @Test + public void testKeyValueMirrorCrossDomains() throws Exception { + runInJsHubLeaf((hub, leaf) -> { + KeyValueManagement hubKvm = hub.keyValueManagement(); + KeyValueManagement leafKvm = leaf.keyValueManagement(); + + // Create main KV on HUB + KeyValueStatus hubStatus = hubKvm.create(KeyValueConfiguration.builder() + .name("TEST") + .storageType(StorageType.Memory) + .build()); + + KeyValue hubKv = hub.keyValue("TEST"); + hubKv.put("key1", "aaa0"); + hubKv.put("key2", "bb0"); + hubKv.put("key3", "c0"); + hubKv.delete("key3"); + + leafKvm.create(KeyValueConfiguration.builder() + .name("MIRROR") + .mirror(Mirror.builder() + .sourceName("TEST") + .domain(null) // just for coverage! + .domain("HUB") // it will take this since it comes last + .build()) + .build()); + + sleep(200); // make sure things get a chance to propagate + StreamInfo si = leaf.jetStreamManagement().getStreamInfo("KV_MIRROR"); + assertTrue(si.getConfiguration().getMirrorDirect()); + assertEquals(3, si.getStreamState().getMsgCount()); + + KeyValue leafKv = leaf.keyValue("MIRROR"); + _testMirror(hubKv, leafKv, 1); + + // Bind through leafnode connection but to origin KV. + KeyValue hubViaLeafKv = + leaf.keyValue("TEST", KeyValueOptions.builder().jsDomain("HUB").build()); + _testMirror(hubKv, hubViaLeafKv, 2); + }); + } + + private void _testMirror(KeyValue okv, KeyValue mkv, int num) throws Exception { + mkv.put("key1", "aaa" + num); + mkv.put("key3", "c" + num); + + sleep(200); // make sure things get a chance to propagate + KeyValueEntry kve = mkv.get("key3"); + assertEquals("c" + num, kve.getValueAsString()); + + mkv.delete("key3"); + sleep(200); // make sure things get a chance to propagate + assertNull(mkv.get("key3")); + + kve = mkv.get("key1"); + assertEquals("aaa" + num, kve.getValueAsString()); + + // Make sure we can create a watcher on the mirror KV. + TestKeyValueWatcher mWatcher = new TestKeyValueWatcher("mirrorWatcher" + num, false); + try (NatsKeyValueWatchSubscription mWatchSub = mkv.watchAll(mWatcher)) { + sleep(200); // give the messages time to propagate + } + validateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.DELETE}, mWatcher); + + // Does the origin data match? + if (okv != null) { + TestKeyValueWatcher oWatcher = new TestKeyValueWatcher("originWatcher" + num, false); + try (NatsKeyValueWatchSubscription oWatchSub = okv.watchAll(oWatcher)) { + sleep(200); // give the messages time to propagate + } + validateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.DELETE}, oWatcher); + } + } } diff --git a/src/test/java/io/nats/client/impl/RequestTests.java b/src/test/java/io/nats/client/impl/RequestTests.java index 9d2218bc6..4bc28b00d 100644 --- a/src/test/java/io/nats/client/impl/RequestTests.java +++ b/src/test/java/io/nats/client/impl/RequestTests.java @@ -408,8 +408,9 @@ public void testRequireCleanupOnCancel() throws IOException, InterruptedExceptio assertEquals(Connection.Status.CONNECTED, nc.getStatus(), "Connected Status"); Future incoming = nc.request("subject", null); - incoming.cancel(true); - + // incoming.cancel(true); // sff I don't think this call helps the test or matters. + // This flaps sometimes. I think it's better to check it as soon as possible + // hence removing the cancel call assertEquals(1, ((NatsStatistics)nc.getStatistics()).getOutstandingRequests()); } finally { nc.close(); diff --git a/src/test/java/io/nats/client/support/ByteArrayBuilderTests.java b/src/test/java/io/nats/client/support/ByteArrayBuilderTests.java index de1c8a6eb..3c69a8f90 100644 --- a/src/test/java/io/nats/client/support/ByteArrayBuilderTests.java +++ b/src/test/java/io/nats/client/support/ByteArrayBuilderTests.java @@ -2,6 +2,9 @@ import org.junit.jupiter.api.Test; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.Collections; @@ -81,13 +84,31 @@ private static void _equalsBytes(ByteArrayPrimitiveBuilder bab) { } @Test - public void copyTo() { + public void copyTo() throws IOException { ByteArrayBuilder bab = new ByteArrayBuilder(); bab.append("0123456789"); byte[] target = "AAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); assertEquals(10, bab.copyTo(target, 0)); assertEquals(10, bab.copyTo(target, 10)); assertEquals("01234567890123456789", new String(target)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + bab.copyTo(out); + assertEquals(10, out.toString().length()); + } + + @Test + public void copyToPrimitiveBuilder() throws IOException { + ByteArrayPrimitiveBuilder bab = new ByteArrayPrimitiveBuilder(); + bab.append("0123456789"); + byte[] target = "AAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); + assertEquals(10, bab.copyTo(target, 0)); + assertEquals(10, bab.copyTo(target, 10)); + assertEquals("01234567890123456789", new String(target)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + bab.copyTo(out); + assertEquals(10, out.toString().length()); } @Test @@ -231,13 +252,27 @@ public void miscCoverage() { ; assertEquals(" \r\nnullfoonullbar4273zbazbazz", bab.toString()); assertEquals(29, bab.length()); + assertEquals(DEFAULT_ASCII_ALLOCATION, bab.capacity()); bab.append(bab); assertEquals(" \r\nnullfoonullbar4273zbazbazz \r\nnullfoonullbar4273zbazbazz", bab.toString()); assertEquals(58, bab.length()); + assertEquals(DEFAULT_ASCII_ALLOCATION * 2, bab.capacity()); bab.setAllocationSize(100); bab.clear(); assertEquals(0, bab.length()); + assertEquals(DEFAULT_ASCII_ALLOCATION * 2, bab.capacity()); + + bab.appendUnchecked((byte)'0'); + bab.appendUnchecked("123456789".getBytes()); + assertEquals(10, bab.length()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes(), 0, 10); + assertEquals(60, bab.length()); + assertThrows(BufferOverflowException.class, () -> bab.appendUnchecked("12345".getBytes())); } @Test @@ -263,13 +298,28 @@ public void miscCoverageByteArrayPrimitiveBuilder() { ; assertEquals(" \r\nnullfoonullbar4273zbazbazz", bab.toString()); assertEquals(29, bab.length()); + assertEquals(DEFAULT_ASCII_ALLOCATION, bab.capacity()); + bab.append(bab); assertEquals(" \r\nnullfoonullbar4273zbazbazz \r\nnullfoonullbar4273zbazbazz", bab.toString()); assertEquals(58, bab.length()); + assertEquals(DEFAULT_ASCII_ALLOCATION * 2, bab.capacity()); bab.setAllocationSize(DEFAULT_ASCII_ALLOCATION + 1); bab.clear(); assertEquals(0, bab.length()); assertEquals(DEFAULT_ASCII_ALLOCATION * 2, bab.getAllocationSize()); + assertEquals(DEFAULT_ASCII_ALLOCATION * 2, bab.capacity()); + + bab.appendUnchecked((byte)'0'); + bab.appendUnchecked("123456789".getBytes()); + assertEquals(10, bab.length()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + bab.appendUnchecked("1234567890".getBytes()); + assertEquals(60, bab.length()); + assertThrows(ArrayIndexOutOfBoundsException.class, () -> bab.appendUnchecked("12345".getBytes())); } } diff --git a/src/test/java/io/nats/client/support/JsonUtilsTests.java b/src/test/java/io/nats/client/support/JsonUtilsTests.java index e1f076ff4..ed1241c21 100644 --- a/src/test/java/io/nats/client/support/JsonUtilsTests.java +++ b/src/test/java/io/nats/client/support/JsonUtilsTests.java @@ -14,6 +14,7 @@ package io.nats.client.support; import io.nats.client.PurgeOptions; +import io.nats.client.impl.Headers; import io.nats.client.utils.ResourceUtils; import org.junit.jupiter.api.Test; @@ -25,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import static io.nats.client.support.ApiConstants.DESCRIPTION; import static io.nats.client.support.Encoding.jsonDecode; import static io.nats.client.support.Encoding.jsonEncode; import static io.nats.client.support.JsonUtils.*; @@ -221,6 +223,15 @@ public void testAddFields() { addFieldWhenGtZero(sb, "longgt0", 1L); assertEquals(110, sb.length()); + + addField(sb, "null-header", (Headers)null); + assertEquals(110, sb.length()); + + addField(sb, "null-header", new Headers()); + assertEquals(110, sb.length()); + + addField(sb, "header", new Headers().add("foo", "bar").add("foo", "baz")); + assertEquals(141, sb.length()); } static final String EXPECTED_LIST_JSON = "{\"a1\":[\"one\"],\"a2\":[\"two\",\"too\"],\"l1\":[\"one\"],\"l2\":[\"two\",\"too\"],\"j1\":[{\"filter\":\"sub1\",\"keep\":421}],\"j2\":[{\"filter\":\"sub2\",\"seq\":732},{\"filter\":\"sub3\"}],\"d1\":[1000000],\"d2\":[2000000,3000000]}"; @@ -402,4 +413,11 @@ public void testMiscCoverage() { assertEquals(0, getMapOfLists("\"bad\": ").size()); assertEquals("\"field\": ", removeObject("\"field\": ", "notfound")); } + + @Test + public void testReadStringMayHaveQuotes() { + String json = "{\"num\":1,\"description\":\"q\\\"quoted\\\"q tab\\ttab =\\u003d=\"}"; + assertNull(readStringMayHaveQuotes(json, "NotThere", null)); + assertEquals("q\"quoted\"q tab\ttab ===", readStringMayHaveQuotes(json, DESCRIPTION, null)); + } } diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index d412708a6..e1572dcff 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -63,6 +63,10 @@ public interface InServerTest { void test(Connection nc) throws Exception; } + public interface TwoServerTest { + void test(Connection nc1, Connection nc2) throws Exception; + } + public static void runInServer(InServerTest inServerTest) throws Exception { runInServer(false, false, inServerTest); } @@ -127,6 +131,46 @@ public static void runInExternalServer(String url, InServerTest inServerTest) th } } + public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception { + int hubPort = NatsTestServer.nextPort(); + int hubLeafPort = NatsTestServer.nextPort(); + int leafPort = NatsTestServer.nextPort(); + + String[] hubInserts = new String[] { + "server_name: HUB", + "jetstream {", + " domain: HUB", + "}", + "leafnodes {", + " listen = 127.0.0.1:" + hubLeafPort, + "}" + }; + + String[] leafInserts = new String[] { + "server_name: LEAF", + "jetstream {", + " domain: LEAF", + "}", + "leafnodes {", + " remotes = [ { url: \"leaf://127.0.0.1:" + hubLeafPort + "\" } ]", + "}" + }; + + try (NatsTestServer hub = new NatsTestServer(hubPort, false, true, null, hubInserts, null); + Connection nchub = standardConnection(hub.getURI()); + NatsTestServer leaf = new NatsTestServer(leafPort, false, true, null, leafInserts, null); + Connection ncleaf = standardConnection(leaf.getURI()) + ) { + try { + twoServerTest.test(nchub, ncleaf); + } + finally { + cleanupJs(nchub); + cleanupJs(ncleaf); + } + } + } + private static void cleanupJs(Connection c) { try { diff --git a/src/test/resources/data/StreamConfiguration.json b/src/test/resources/data/StreamConfiguration.json index 4e7eed58e..8094ba70b 100644 --- a/src/test/resources/data/StreamConfiguration.json +++ b/src/test/resources/data/StreamConfiguration.json @@ -21,6 +21,7 @@ "discard_new_per_subject": true, "allow_rollup_hdrs": true, "allow_direct": true, + "mirror_direct": true, "placement": { "cluster": "clstr", "tags": ["tag1", "tag2"]