diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java
index b26d40d5e..2f2da8a3e 100644
--- a/src/main/java/io/nats/client/JetStream.java
+++ b/src/main/java/io/nats/client/JetStream.java
@@ -358,13 +358,13 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException;
/**
* Create a synchronous subscription to the specified subject.
@@ -375,14 +375,14 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to
* @param options optional subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create a synchronous subscription to the specified subject.
@@ -393,7 +393,7 @@ public interface JetStream {
*
See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for
* information about creating an asynchronous subscription with callbacks.
*
- * @param subject the subject to subscribe to
+ * @param subscribeSubject the subject to subscribe to
* @param queue the optional queue group to join
* @param options optional subscription options
* @return The subscription
@@ -401,14 +401,14 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
@@ -417,14 +417,13 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
- *
- * @param subject The subject to subscribe to.
+ * @param subscribeSubject The subject to subscribe to.
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param autoAck Whether to auto ack
@@ -434,14 +433,14 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject under the control of the
* specified dispatcher. Since a MessageHandler is also required, the Dispatcher will
* not prevent duplicate subscriptions from being made.
*
- * @param subject The subject to subscribe to.
+ * @param subscribeSubject The subject to subscribe to.
* @param queue the optional queue group to join
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
@@ -452,22 +451,22 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create a subscription to the specified subject in the mode of pull, with additional options
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
* @param options pull subscription options
* @return The subscription
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options
- * @param subject The subject to subscribe to
+ * @param subscribeSubject The subject to subscribe to
* @param dispatcher The dispatcher to handle this subscription
* @param handler The target for the messages
* @param options pull subscription options
@@ -476,7 +475,7 @@ public interface JetStream {
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
- JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;
+ JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException;
/**
* Get a stream context for a specific named stream. Verifies that the stream exists.
diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
index 23223e111..8a16c76d1 100644
--- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java
+++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java
@@ -852,7 +852,11 @@ public Builder maxDeliver(long maxDeliver) {
* @return Builder
*/
public Builder filterSubject(String filterSubject) {
- return filterSubjects(Collections.singletonList(filterSubject));
+ this.filterSubjects.clear();
+ if (!nullOrEmpty(filterSubject)) {
+ this.filterSubjects.add(filterSubject);
+ }
+ return this;
}
diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java
index bafb1c1ec..e85e1adfa 100644
--- a/src/main/java/io/nats/client/impl/NatsJetStream.java
+++ b/src/main/java/io/nats/client/impl/NatsJetStream.java
@@ -234,22 +234,21 @@ MessageManager createMessageManager(
MessageManagerFactory _pullOrderedMessageManagerFactory =
(mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new OrderedPullMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode);
- JetStreamSubscription createSubscription(String subscribeSubject,
+ JetStreamSubscription createSubscription(String userSubscribeSubject,
+ PushSubscribeOptions pushSubscribeOptions,
+ PullSubscribeOptions pullSubscribeOptions,
String queueName,
NatsDispatcher dispatcher,
MessageHandler userHandler,
- boolean isAutoAck,
- PushSubscribeOptions pushSubscribeOptions,
- PullSubscribeOptions pullSubscribeOptions
+ boolean isAutoAck
) throws IOException, JetStreamApiException {
- // 0. parameter notes. For those relating to the callers, you can see all the callers further down in this source file.
- // - subscribeSubject will have been resolved by the caller from their method parameter or the consumer config
+ // Parameter notes. For those relating to the callers, you can see all the callers further down in this source file.
// - pull subscribe callers guarantee that pullSubscribeOptions is not null
// - qgroup is always null with pull callers
// - callers only ever provide one of the subscribe options
- // 1. Prepare for all the validation
+ // 1. Initial prep and validation
boolean isPullMode = pullSubscribeOptions != null;
SubscribeOptions so;
@@ -290,7 +289,7 @@ JetStreamSubscription createSubscription(String subscribeSubject,
}
}
- // 2A. Flow Control / heartbeat not always valid
+ // 1B. Flow Control / heartbeat not always valid
if (userCC.getIdleHeartbeat() != null && userCC.getIdleHeartbeat().toMillis() > 0) {
if (isPullMode) {
throw JsSubFcHbNotValidPull.instance();
@@ -300,10 +299,31 @@ JetStreamSubscription createSubscription(String subscribeSubject,
}
}
- // 2B. Did they tell me what stream? No? look it up.
+ // 2. figure out user provided subjects and prepare the userCcFilterSubjects
+ userSubscribeSubject = emptyAsNull(userSubscribeSubject);
+ List userCcFilterSubjects = new ArrayList<>();
+ if (userCC.getFilterSubject() == null) { // empty filterSubjects gives gives null
+ // userCC.filterSubjects empty, populate userCcFilterSubjects w/userSubscribeSubject if possible
+ if (userSubscribeSubject != null) {
+ userCcFilterSubjects.add(userSubscribeSubject);
+ }
+ }
+ else {
+ // userCC.filterSubjects not empty, validate them
+ userCcFilterSubjects.addAll(userCC.getFilterSubjects());
+ // If userSubscribeSubject is provided it must be one of the filter subjects.
+ if (userSubscribeSubject != null && !userCcFilterSubjects.contains(userSubscribeSubject)) {
+ throw JsSubSubjectDoesNotMatchFilter.instance();
+ }
+ }
+
+ // 3. Did they tell me what stream? No? look it up.
final String settledStream;
if (stream == null) {
- settledStream = lookupStreamBySubject(subscribeSubject);
+ if (userCcFilterSubjects.isEmpty()) {
+ throw new IllegalArgumentException("Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject.");
+ }
+ settledStream = lookupStreamBySubject(userCcFilterSubjects.get(0));
if (settledStream == null) {
throw JsSubNoMatchingStreamForSubject.instance();
}
@@ -319,7 +339,8 @@ JetStreamSubscription createSubscription(String subscribeSubject,
}
String inboxDeliver = userCC.getDeliverSubject();
- // 3. Does this consumer already exist? FastBind bypasses the lookup; the dev better know what they are doing
+ // 4. Does this consumer already exist? FastBind bypasses the lookup;
+ // the dev better know what they are doing...
if (!so.isFastBind() && consumerName != null) {
ConsumerInfo serverInfo = lookupConsumerInfo(settledStream, consumerName);
@@ -364,11 +385,14 @@ else if (!serverCC.getDeliverGroup().equals(deliverGroup)) {
throw JsSubExistingQueueDoesNotMatchRequestedQueue.instance();
}
- // durable already exists, make sure the filter subject matches
- if (nullOrEmpty(subscribeSubject)) { // allowed if they had given both stream and consumer name
- subscribeSubject = serverCC.getFilterSubject();
+ // consumer already exists, make sure the filter subject matches
+ // subscribeSubject, if supplied came from the user directly
+ // or in the userCC or might not have been in either place
+ if (userCcFilterSubjects.isEmpty()) {
+ // still also might be null, which the server treats as >
+ userCcFilterSubjects = serverCC.getFilterSubjects();
}
- else if (!isFilterMatch(subscribeSubject, serverCC.getFilterSubject(), settledStream)) {
+ else if (!listsAreEquivalent(userCcFilterSubjects, serverCC.getFilterSubjects())) {
throw JsSubSubjectDoesNotMatchFilter.instance();
}
@@ -379,7 +403,7 @@ else if (so.isBind()) {
}
}
- // 4. If pull or no deliver subject (inbox) provided or found, make an inbox.
+ // 5. If pull or no deliver subject (inbox) provided or found, make an inbox.
final String settledInboxDeliver;
if (isPullMode) {
settledInboxDeliver = conn.createInbox() + ".*";
@@ -391,13 +415,13 @@ else if (inboxDeliver == null) {
settledInboxDeliver = inboxDeliver;
}
- // 5. If consumer does not exist, create and settle on the config. Name will have to wait
+ // 6. If consumer does not exist, create and settle on the config. Name will have to wait
// If the consumer exists, I know what the settled info is
final String settledConsumerName;
final ConsumerConfiguration settledCC;
if (so.isFastBind() || serverCC != null) {
settledCC = serverCC;
- settledConsumerName = so.getName();
+ settledConsumerName = so.getName(); // will never be null in this case
}
else {
ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);
@@ -407,19 +431,18 @@ else if (inboxDeliver == null) {
ccBuilder.deliverSubject(settledInboxDeliver);
}
- if (userCC.getFilterSubject() == null) {
- // this would happen if they use the api subscribe subject but didn't supply a
- // consumer config at all or their consumer config did not specify any filter subject
- ccBuilder.filterSubject(subscribeSubject);
- }
+ // userCC.filterSubjects might have originally been empty
+ // but there might have been a userSubscribeSubject,
+ // so this makes sure it's resolved either way
+ ccBuilder.filterSubjects(userCcFilterSubjects);
ccBuilder.deliverGroup(deliverGroup);
settledCC = ccBuilder.build();
- settledConsumerName = null;
+ settledConsumerName = null; // the server will give us a name
}
- // 6. create the subscription. lambda needs final or effectively final vars
+ // 7. create the subscription. lambda needs final or effectively final vars
final MessageManager mm;
final NatsSubscriptionFactory subFactory;
if (isPullMode) {
@@ -449,7 +472,7 @@ else if (inboxDeliver == null) {
sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(settledInboxDeliver, deliverGroup, handler, subFactory);
}
- // 7. The consumer might need to be created, do it here
+ // 8. The consumer might need to be created, do it here
if (settledConsumerName == null) {
_createConsumerUnsubscribeOnException(settledStream, settledCC, sub);
}
@@ -491,14 +514,14 @@ public List getChanges(ConsumerConfiguration serverCc) {
if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); }
- if (!filterSubjects.isEmpty() && !notNullListsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }
+ if (!filterSubjects.isEmpty() && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); }
if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); }
if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); }
if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); }
if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); }
- if (backoff != null && !listsAreEqual(backoff, serverCcc.backoff, true)) { changes.add("backoff"); }
- if (metadata != null && !mapsAreEqual(metadata, serverCcc.metadata, true)) { changes.add("metadata"); }
+ if (backoff != null && !listsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); }
+ if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); }
// do not need to check Durable because the original is retrieved by the durable name
@@ -528,24 +551,6 @@ public void onMessage(Message msg) throws InterruptedException {
}
}
- private boolean isFilterMatch(String subscribeSubject, String filterSubject, String stream) throws IOException, JetStreamApiException {
-
- // subscribeSubject guaranteed to not be null
- // filterSubject may be null or empty or have value
-
- if (subscribeSubject.equals(filterSubject)) {
- return true;
- }
-
- if (nullOrEmpty(filterSubject) || filterSubject.equals(">")) {
- // lookup stream subject returns null if there is not exactly one subject
- String streamSubject = lookupStreamSubject(stream);
- return subscribeSubject.equals(streamSubject);
- }
-
- return false;
- }
-
private String lookupStreamSubject(String stream) throws IOException, JetStreamApiException {
StreamInfo si = _getStreamInfo(stream, null);
List streamSubjects = si.getConfiguration().getSubjects();
@@ -556,101 +561,76 @@ private String lookupStreamSubject(String stream) throws IOException, JetStreamA
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException {
- subject = validateSubject(subject, true);
- return createSubscription(subject, null, null, null, false, null, null);
+ public JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException {
+ return createSubscription(subscribeSubject, null, null, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
- return createSubscription(subject, null, null, null, false, options, null);
+ public JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
+ return createSubscription(subscribeSubject, options, null, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
+ public JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
queue = validateQueueName(emptyAsNull(queue), false);
- return createSubscription(subject, queue, null, null, false, options, null);
+ return createSubscription(subscribeSubject, options, null, queue, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
- subject = validateSubject(subject, true);
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, null, null);
+ return createSubscription(subscribeSubject, null, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, options, null);
+ return createSubscription(subscribeSubject, options, null, null, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
+ public JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
queue = validateQueueName(emptyAsNull(queue), false);
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
- return createSubscription(subject, queue, (NatsDispatcher) dispatcher, handler, autoAck, options, null);
+ return createSubscription(subscribeSubject, options, null, queue, (NatsDispatcher) dispatcher, handler, autoAck);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
+ public JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
validateNotNull(options, "Pull Subscribe Options");
- return createSubscription(subject, null, null, null, false, null, options);
+ return createSubscription(subscribeSubject, null, options, null, null, null, false);
}
/**
* {@inheritDoc}
*/
@Override
- public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException {
- subject = extractAndValidateSubject(subject, options);
+ public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException {
validateNotNull(dispatcher, "Dispatcher");
validateNotNull(handler, "Handler");
validateNotNull(options, "Pull Subscribe Options");
- return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, false, null, options);
- }
-
- private String extractAndValidateSubject(String supplied, SubscribeOptions options) {
- if (supplied != null) {
- return validateSubject(supplied, isSubjectRequired(options));
- }
- if (options != null) {
- ConsumerConfiguration cc = options.getConsumerConfiguration();
- if (cc != null) {
- return validateSubject(cc.getFilterSubject(), isSubjectRequired(options));
- }
- }
- return validateSubject(null, isSubjectRequired(options));
- }
-
- private boolean isSubjectRequired(SubscribeOptions options) {
- return options == null || !options.isBind();
+ return createSubscription(subscribeSubject, null, options, null, (NatsDispatcher) dispatcher, handler, false);
}
/**
diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java
index 0544cf807..7d4d4bbbe 100644
--- a/src/main/java/io/nats/client/support/Validator.java
+++ b/src/main/java/io/nats/client/support/Validator.java
@@ -27,26 +27,57 @@ private Validator() {
} /* ensures cannot be constructed */
public static String validateSubject(String s, boolean required) {
- return validateSubject(s, "Subject", required, false);
+ return validateSubject(s, "Subject", required);
}
public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) {
+ subject = validateSubject(subject, label, required);
+ if (cantEndWithGt && subject.endsWith(".>")) {
+ throw new IllegalArgumentException(label + " last segment cannot be '>'");
+ }
+ return subject;
+ }
+
+ /*
+ cannot contain spaces \r \n \t
+ cannot start or with subject token delimiter .
+ some things don't allow it to end greater
+ */
+ public static String validateSubject(String subject, String label, boolean required) {
if (emptyAsNull(subject) == null) {
if (required) {
throw new IllegalArgumentException(label + " cannot be null or empty.");
}
return null;
}
+
+ subject = subject.trim();
String[] segments = subject.split("\\.");
- for (int x = 0; x < segments.length; x++) {
- String segment = segments[x];
- if (segment.equals(">")) {
- if (cantEndWithGt || x != segments.length - 1) { // if it can end with gt, gt must be last segment
- throw new IllegalArgumentException(label + " cannot contain '>'");
+ for (int seg = 0; seg < segments.length; seg++) {
+ String segment = segments[seg];
+ int sl = segment.length();
+ if (sl == 0) {
+ if (seg == 0) {
+ throw new IllegalArgumentException(label + " cannot start with '.'");
+ }
+ throw new IllegalArgumentException(label + " segment cannot be empty");
+ }
+ else {
+ for (int m = 0; m < sl; m++) {
+ char c = segment.charAt(m);
+ switch (c) {
+ case 32:
+ case '\r':
+ case '\n':
+ throw new IllegalArgumentException(label + " cannot contain space, carriage return or linefeed character");
+ case '*':
+ case '>':
+ if (sl != 1) {
+ throw new IllegalArgumentException(label + " wildcard improperly placed.");
+ }
+ break;
+ }
}
- }
- else if (!segment.equals("*") && notPrintable(segment)) {
- throw new IllegalArgumentException(label + " must be printable characters only.");
}
}
return subject;
@@ -561,57 +592,40 @@ public static boolean isSemVer(String s) {
return SEMVER_PATTERN.matcher(s).find();
}
- public static boolean listsAreEqual(List l1, List l2, boolean nullSecondEqualsEmptyFirst)
+ public static boolean listsAreEquivalent(List l1, List l2)
{
- if (l1 == null)
- {
- return l2 == null;
- }
-
- if (l2 == null)
- {
- return nullSecondEqualsEmptyFirst && l1.isEmpty();
- }
+ int s1 = l1 == null ? 0 : l1.size();
+ int s2 = l2 == null ? 0 : l2.size();
- return l1.equals(l2);
- }
-
- public static boolean notNullListsAreEquivalent(List l1, List l2)
- {
- if (l1.size() != l2.size()) {
+ if (s1 != s2) {
return false;
}
- for (T t : l1) {
- if (!l2.contains(t)) {
- return false;
+ if (s1 > 0) {
+ for (T t : l1) {
+ if (!l2.contains(t)) {
+ return false;
+ }
}
}
-
return true;
}
-
- public static boolean mapsAreEqual(Map m1, Map m2, boolean nullSecondEqualsEmptyFirst)
+ public static boolean mapsAreEquivalent(Map m1, Map m2)
{
- if (m1 == null)
- {
- return m2 == null;
- }
+ int s1 = m1 == null ? 0 : m1.size();
+ int s2 = m2 == null ? 0 : m2.size();
- if (m2 == null)
- {
- return nullSecondEqualsEmptyFirst && m1.isEmpty();
- }
-
- if (m1.size() != m2.size()) {
+ if (s1 != s2) {
return false;
}
- for (Map.Entry entry : m1.entrySet())
- {
- if (!entry.getValue().equals(m2.get(entry.getKey()))) {
- return false;
+ if (s1 > 0) {
+ for (Map.Entry entry : m1.entrySet())
+ {
+ if (!entry.getValue().equals(m2.get(entry.getKey()))) {
+ return false;
+ }
}
}
diff --git a/src/main/java/io/nats/service/Endpoint.java b/src/main/java/io/nats/service/Endpoint.java
index 339d7aaab..cbbf3d995 100644
--- a/src/main/java/io/nats/service/Endpoint.java
+++ b/src/main/java/io/nats/service/Endpoint.java
@@ -85,7 +85,7 @@ public Endpoint(String name, String subject, Map metadata) {
this.subject = this.name;
}
else {
- this.subject = validateSubject(subject, "Endpoint Subject", false, false);
+ this.subject = validateSubject(subject, "Endpoint Subject", false);
}
}
else {
diff --git a/src/main/java/io/nats/service/Group.java b/src/main/java/io/nats/service/Group.java
index c6d65ac39..0c692e067 100644
--- a/src/main/java/io/nats/service/Group.java
+++ b/src/main/java/io/nats/service/Group.java
@@ -16,6 +16,7 @@
import java.util.Objects;
import static io.nats.client.support.NatsConstants.DOT;
+import static io.nats.client.support.Validator.emptyAsNull;
import static io.nats.client.support.Validator.validateSubject;
/**
@@ -27,11 +28,20 @@ public class Group {
/**
* Construct a group.
- * Group names and subjects are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'
+ * Group names are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'
* @param name the group name
*/
public Group(String name) {
- this.name = validateSubject(name, "Group Name", true, true);
+ name = emptyAsNull(name);
+ if (name == null) {
+ throw new IllegalArgumentException("Group name cannot be null or empty.");
+ }
+
+ if (name.contains(">")) {
+ throw new IllegalArgumentException("Group name cannot contain '>'.");
+ }
+
+ this.name = validateSubject(name, "Group name", false);
}
/**
diff --git a/src/test/java/io/nats/client/JetStreamOptionsTests.java b/src/test/java/io/nats/client/JetStreamOptionsTests.java
index 0d248913a..b1388d783 100644
--- a/src/test/java/io/nats/client/JetStreamOptionsTests.java
+++ b/src/test/java/io/nats/client/JetStreamOptionsTests.java
@@ -103,8 +103,8 @@ public void testPrefixValidation() {
assertValidPrefix(HAS_TIC);
assertInvalidPrefix(HAS_SPACE);
- assertInvalidPrefix(HAS_STAR);
- assertInvalidPrefix(HAS_GT);
+ assertInvalidPrefix(STAR_NOT_SEGMENT);
+ assertInvalidPrefix(GT_NOT_SEGMENT);
assertInvalidPrefix(HAS_LOW);
assertInvalidPrefix(HAS_127);
@@ -145,8 +145,8 @@ public void testDomainValidation() {
assertValidDomain(HAS_TIC);
assertInvalidDomain(HAS_SPACE);
- assertInvalidDomain(HAS_STAR);
- assertInvalidDomain(HAS_GT);
+ assertInvalidDomain(STAR_NOT_SEGMENT);
+ assertInvalidDomain(GT_NOT_SEGMENT);
assertInvalidDomain(HAS_LOW);
assertInvalidDomain(HAS_127);
diff --git a/src/test/java/io/nats/client/SubscribeOptionsTests.java b/src/test/java/io/nats/client/SubscribeOptionsTests.java
index c1b27e9f2..3ca4b61a8 100644
--- a/src/test/java/io/nats/client/SubscribeOptionsTests.java
+++ b/src/test/java/io/nats/client/SubscribeOptionsTests.java
@@ -24,7 +24,7 @@
import static org.junit.jupiter.api.Assertions.*;
public class SubscribeOptionsTests extends TestBase {
- private static final String[] badNames = {HAS_DOT, HAS_GT, HAS_STAR, HAS_FWD_SLASH, HAS_BACK_SLASH};
+ private static final String[] badNames = {HAS_DOT, GT_NOT_SEGMENT, STAR_NOT_SEGMENT, HAS_FWD_SLASH, HAS_BACK_SLASH};
@Test
public void testPushAffirmative() {
diff --git a/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java b/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java
index d1cfb4eb3..b5f6f6090 100644
--- a/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java
+++ b/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java
@@ -84,8 +84,8 @@ public void testConstructionInvalidsCoverage() {
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_PRINTABLE));
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_DOT));
- assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_STAR));
- assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_GT));
+ assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(STAR_NOT_SEGMENT));
+ assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(GT_NOT_SEGMENT));
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder(HAS_127));
diff --git a/src/test/java/io/nats/client/api/ObjectStoreApiTests.java b/src/test/java/io/nats/client/api/ObjectStoreApiTests.java
index 16836e8c8..18d1d7fb0 100644
--- a/src/test/java/io/nats/client/api/ObjectStoreApiTests.java
+++ b/src/test/java/io/nats/client/api/ObjectStoreApiTests.java
@@ -329,8 +329,8 @@ public void testConstructionInvalidsCoverage() {
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_PRINTABLE));
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_DOT));
- assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_STAR));
- assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_GT));
+ assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(STAR_NOT_SEGMENT));
+ assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(GT_NOT_SEGMENT));
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder(HAS_127));
diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java
index ecd615096..cdc21c050 100644
--- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java
@@ -277,8 +277,6 @@ public void testJetStreamSubscribeErrors() throws Exception {
// subject
IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(null));
assertTrue(iae.getMessage().startsWith("Subject"));
- iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(HAS_SPACE));
- assertTrue(iae.getMessage().startsWith("Subject"));
iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(null, (PushSubscribeOptions)null));
assertTrue(iae.getMessage().startsWith("Subject"));
@@ -347,7 +345,7 @@ public void testFilterSubjectEphemeral() throws Exception {
// subscribe to A
cc = builder().filterSubject(subjectA).ackPolicy(AckPolicy.None).build();
pso = PushSubscribeOptions.builder().configuration(cc).build();
- sub = js.subscribe(subjectWild, pso);
+ sub = js.subscribe(subjectA, pso);
nc.flush(Duration.ofSeconds(1));
m = sub.nextMessage(Duration.ofSeconds(1));
@@ -362,7 +360,7 @@ public void testFilterSubjectEphemeral() throws Exception {
// subscribe to B
cc = builder().filterSubject(subjectB).ackPolicy(AckPolicy.None).build();
pso = PushSubscribeOptions.builder().configuration(cc).build();
- sub = js.subscribe(subjectWild, pso);
+ sub = js.subscribe(subjectB, pso);
nc.flush(Duration.ofSeconds(1));
m = sub.nextMessage(Duration.ofSeconds(1));
@@ -567,80 +565,74 @@ public void testFilterMismatchErrors() throws Exception {
createMemoryStream(jsm, STREAM, SUBJECT);
// will work as SubscribeSubject equals Filter Subject
- subscribeOk(js, jsm, SUBJECT, SUBJECT);
- subscribeOk(js, jsm, ">", ">");
- subscribeOk(js, jsm, "*", "*");
-
- // will work as SubscribeSubject != empty Filter Subject,
- // b/c Stream has exactly 1 subject and is a match.
- subscribeOk(js, jsm, "", SUBJECT);
-
- // will work as SubscribeSubject != Filter Subject of '>'
- // b/c Stream has exactly 1 subject and is a match.
- subscribeOk(js, jsm, ">", SUBJECT);
+ filterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT);
+ filterMatchSubscribeOk(js, jsm, ">", ">");
+ filterMatchSubscribeOk(js, jsm, "*", "*");
// will not work
- subscribeEx(js, jsm, "*", SUBJECT);
+ filterMatchSubscribeEx(js, jsm, SUBJECT, "");
+ filterMatchSubscribeEx(js, jsm, SUBJECT, ">");
+ filterMatchSubscribeEx(js, jsm, SUBJECT, "*");
// multiple subjects no wildcards
jsm.deleteStream(STREAM);
createMemoryStream(jsm, STREAM, SUBJECT, subject(2));
// will work as SubscribeSubject equals Filter Subject
- subscribeOk(js, jsm, SUBJECT, SUBJECT);
- subscribeOk(js, jsm, ">", ">");
- subscribeOk(js, jsm, "*", "*");
+ filterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT);
+ filterMatchSubscribeOk(js, jsm, ">", ">");
+ filterMatchSubscribeOk(js, jsm, "*", "*");
// will not work because stream has more than 1 subject
- subscribeEx(js, jsm, "", SUBJECT);
- subscribeEx(js, jsm, ">", SUBJECT);
- subscribeEx(js, jsm, "*", SUBJECT);
+ filterMatchSubscribeEx(js, jsm, SUBJECT, "");
+ filterMatchSubscribeEx(js, jsm, SUBJECT, ">");
+ filterMatchSubscribeEx(js, jsm, SUBJECT, "*");
// multiple subjects via '>'
jsm.deleteStream(STREAM);
createMemoryStream(jsm, STREAM, SUBJECT_GT);
// will work, exact matches
- subscribeOk(js, jsm, subjectDot("1"), subjectDot("1"));
- subscribeOk(js, jsm, ">", ">");
+ filterMatchSubscribeOk(js, jsm, subjectDot("1"), subjectDot("1"));
+ filterMatchSubscribeOk(js, jsm, ">", ">");
// will not work because mismatch / stream has more than 1 subject
- subscribeEx(js, jsm, "", subjectDot("1"));
- subscribeEx(js, jsm, ">", subjectDot("1"));
- subscribeEx(js, jsm, SUBJECT_GT, subjectDot("1"));
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), "");
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), ">");
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), SUBJECT_GT);
// multiple subjects via '*'
jsm.deleteStream(STREAM);
createMemoryStream(jsm, STREAM, SUBJECT_STAR);
// will work, exact matches
- subscribeOk(js, jsm, subjectDot("1"), subjectDot("1"));
- subscribeOk(js, jsm, ">", ">");
+ filterMatchSubscribeOk(js, jsm, subjectDot("1"), subjectDot("1"));
+ filterMatchSubscribeOk(js, jsm, ">", ">");
// will not work because mismatch / stream has more than 1 subject
- subscribeEx(js, jsm, "", subjectDot("1"));
- subscribeEx(js, jsm, ">", subjectDot("1"));
- subscribeEx(js, jsm, SUBJECT_STAR, subjectDot("1"));
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), "");
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), ">");
+ filterMatchSubscribeEx(js, jsm, subjectDot("1"), SUBJECT_STAR);
});
}
- private void subscribeOk(JetStream js, JetStreamManagement jsm, String fs, String ss) throws IOException, JetStreamApiException {
+ private void filterMatchSubscribeOk(JetStream js, JetStreamManagement jsm, String subscribeSubject, String... filterSubjects) throws IOException, JetStreamApiException {
int i = RandomUtils.PRAND.nextInt(); // just want a unique number
- setupConsumer(jsm, i, fs);
- unsubscribeEnsureNotBound(js.subscribe(ss, builder().durable(durable(i)).buildPushSubscribeOptions()));
+ filterMatchSetupConsumer(jsm, i, filterSubjects);
+ unsubscribeEnsureNotBound(js.subscribe(subscribeSubject, builder().durable(durable(i)).buildPushSubscribeOptions()));
}
- private void subscribeEx(JetStream js, JetStreamManagement jsm, String fs, String ss) throws IOException, JetStreamApiException {
+ private void filterMatchSubscribeEx(JetStream js, JetStreamManagement jsm, String subscribeSubject, String... filterSubjects) throws IOException, JetStreamApiException {
int i = RandomUtils.PRAND.nextInt(); // just want a unique number
- setupConsumer(jsm, i, fs);
+ filterMatchSetupConsumer(jsm, i, filterSubjects);
IllegalArgumentException iae = assertThrows(IllegalArgumentException.class,
- () -> js.subscribe(ss, builder().durable(durable(i)).buildPushSubscribeOptions()));
+ () -> js.subscribe(subscribeSubject, builder().durable(durable(i)).buildPushSubscribeOptions()));
assertTrue(iae.getMessage().contains(JsSubSubjectDoesNotMatchFilter.id()));
}
- private void setupConsumer(JetStreamManagement jsm, int i, String fs) throws IOException, JetStreamApiException {
+ private void filterMatchSetupConsumer(JetStreamManagement jsm, int i, String... fs) throws IOException, JetStreamApiException {
jsm.addOrUpdateConsumer(STREAM,
- builder().deliverSubject(deliver(i)).durable(durable(i)).filterSubject(fs).build());
+ builder().deliverSubject(deliver(i)).durable(durable(i)).filterSubjects(fs).build());
}
@Test
@@ -652,17 +644,19 @@ public void testBindDurableDeliverSubject() throws Exception {
// create the stream.
createDefaultTestStream(jsm);
- // create a durable push subscriber - has deliver subject
+ // create a durable push subscriber - has a deliver subject
ConsumerConfiguration ccDurPush = builder()
- .durable(durable(1))
- .deliverSubject(deliver(1))
- .build();
+ .durable(durable(1))
+ .deliverSubject(deliver(1))
+ .filterSubject(SUBJECT)
+ .build();
jsm.addOrUpdateConsumer(STREAM, ccDurPush);
// create a durable pull subscriber - notice no deliver subject
ConsumerConfiguration ccDurPull = builder()
- .durable(durable(2))
- .build();
+ .durable(durable(2))
+ .filterSubject(SUBJECT)
+ .build();
jsm.addOrUpdateConsumer(STREAM, ccDurPull);
// try to pull subscribe against a push durable
@@ -677,10 +671,6 @@ public void testBindDurableDeliverSubject() throws Exception {
);
assertTrue(iae.getMessage().contains(JsSubConsumerAlreadyConfiguredAsPush.id()));
- // this one is okay
- JetStreamSubscription sub = js.subscribe(SUBJECT, PullSubscribeOptions.builder().durable(durable(2)).build());
- unsubscribeEnsureNotBound(sub); // so I can re-use the durable
-
// try to push subscribe against a pull durable
iae = assertThrows(IllegalArgumentException.class,
() -> js.subscribe(SUBJECT, PushSubscribeOptions.builder().durable(durable(2)).build())
@@ -719,6 +709,7 @@ public void testConsumerIsNotModified() throws Exception {
.maxBatch(10)
.maxBytes(11)
.replayPolicy(ReplayPolicy.Instant)
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -736,6 +727,7 @@ public void testConsumerIsNotModified() throws Exception {
.maxBytes(48)
.rateLimit(44)
.maxAckPending(45)
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -745,6 +737,7 @@ public void testConsumerIsNotModified() throws Exception {
cc = builder()
.durable(durable(22))
.maxPullWaiting(46)
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -757,6 +750,7 @@ public void testConsumerIsNotModified() throws Exception {
.deliverSubject(deliver(3))
.durable(durable(3))
.startTime(ZonedDateTime.now().plusHours(1))
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -771,6 +765,7 @@ public void testConsumerIsNotModified() throws Exception {
.headersOnly(true)
.maxExpires(30000)
.ackWait(2000)
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -784,6 +779,7 @@ public void testConsumerIsNotModified() throws Exception {
.deliverPolicy(DeliverPolicy.Last)
.ackPolicy(AckPolicy.None)
.replayPolicy(ReplayPolicy.Original)
+ .filterSubject(SUBJECT)
.build();
jsm.addOrUpdateConsumer(STREAM, cc);
@@ -909,11 +905,11 @@ private void _changeEx(IllegalArgumentException iae, String changedField) {
}
private Builder pushDurableBuilder() {
- return builder().durable(PUSH_DURABLE).deliverSubject(DELIVER);
+ return builder().durable(PUSH_DURABLE).deliverSubject(DELIVER).filterSubject(SUBJECT);
}
private Builder pullDurableBuilder() {
- return builder().durable(PULL_DURABLE);
+ return builder().durable(PULL_DURABLE).filterSubject(SUBJECT);
}
@Test
diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
index 5fc7bc043..4408b77b4 100644
--- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java
+++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java
@@ -121,13 +121,13 @@ public void testIterate() throws Exception {
Duration ackWaitDur = Duration.ofMillis(fetchMs * 2);
ConsumerConfiguration cc = ConsumerConfiguration.builder()
- .ackWait(ackWaitDur)
- .build();
+ .ackWait(ackWaitDur)
+ .build();
PullSubscribeOptions options = PullSubscribeOptions.builder()
- .durable(DURABLE)
- .configuration(cc)
- .build();
+ .durable(DURABLE)
+ .configuration(cc)
+ .build();
JetStreamSubscription sub = js.subscribe(SUBJECT, options);
assertSubscription(sub, STREAM, DURABLE, null, true);
@@ -638,7 +638,7 @@ public void testDurable() throws Exception {
@Test
public void testNamed() throws Exception {
- runInJsServer(nc -> {
+ runInJsServer(this::atLeast290, nc -> {
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();
@@ -1017,7 +1017,7 @@ public void testExceedsMaxRequestBytesNthMessageSyncSub() throws Exception {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();
- jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
+ jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).filterSubjects(SUBJECT).build());
PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1));
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
@@ -1049,7 +1049,7 @@ public void testExceedsMaxRequestBytesExactBytes() throws Exception {
createDefaultTestStream(nc);
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();
- jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
+ jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).filterSubjects(SUBJECT).build());
PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1));
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
@@ -1079,7 +1079,7 @@ public void testReader() throws Exception {
JetStream js = nc.jetStream();
// Pre define a consumer
- ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build();
+ ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).filterSubjects(SUBJECT).build();
jsm.addOrUpdateConsumer(STREAM, cc);
PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, DURABLE);
diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java
index e33979fb6..f12b35bad 100644
--- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java
+++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java
@@ -48,7 +48,11 @@ public class JetStreamTestBase extends TestBase {
public static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(1000);
public boolean atLeast290(Connection nc) {
- return nc.getServerInfo().isSameOrNewerThanVersion("2.9.0");
+ return atLeast290(nc.getServerInfo());
+ }
+
+ public boolean atLeast290(ServerInfo si) {
+ return si.isSameOrNewerThanVersion("2.9.0");
}
public boolean atLeast291(ServerInfo si) {
diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java
index 3c012a328..5f9d77d82 100644
--- a/src/test/java/io/nats/client/impl/SimplificationTests.java
+++ b/src/test/java/io/nats/client/impl/SimplificationTests.java
@@ -601,70 +601,80 @@ public void testOrderedBehaviors() throws Exception {
CreateStreamResult csr = createMemoryStream(jsm);
StreamContext sc = js.getStreamContext(csr.stream);
jsPublish(js, csr.subject, 101, 6);
- testOrderedBehaviorNext(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject));
- try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {};
- // Get this in place before subscriptions are made
- ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new;
-
- csr = createMemoryStream(jsm);
- sc = js.getStreamContext(csr.stream);
- jsPublish(js, csr.subject, 101, 6);
- testOrderedBehaviorFetch(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject));
- try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {};
-
- csr = createMemoryStream(jsm);
- sc = js.getStreamContext(csr.stream);
- jsPublish(js, csr.subject, 101, 6);
- testOrderedBehaviorIterable(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject));
- try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {};
- });
- }
-
- private static void testOrderedBehaviorNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception {
- OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
- // Loop through the messages to make sure I get stream sequence 1 to 6
- int expectedStreamSeq = 1;
- while (expectedStreamSeq <= 6) {
- Message m = ctx.next(1000);
- if (m != null) {
- assertEquals(expectedStreamSeq, m.metaData().streamSequence());
- assertEquals(1, m.metaData().consumerSequence());
- ++expectedStreamSeq;
- }
- }
- }
-
- private static void testOrderedBehaviorFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception {
- OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
- try (FetchConsumer fcon = ctx.fetchMessages(6)) {
+ OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject);
+ OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
// Loop through the messages to make sure I get stream sequence 1 to 6
int expectedStreamSeq = 1;
while (expectedStreamSeq <= 6) {
- Message m = fcon.nextMessage();
+ Message m = ctx.next(1000);
if (m != null) {
assertEquals(expectedStreamSeq, m.metaData().streamSequence());
- assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence());
+ assertEquals(1, m.metaData().consumerSequence());
++expectedStreamSeq;
}
}
- }
+ });
}
- private static void testOrderedBehaviorIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception {
- OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
- try (IterableConsumer icon = ctx.iterate()) {
- // Loop through the messages to make sure I get stream sequence 1 to 6
- int expectedStreamSeq = 1;
- while (expectedStreamSeq <= 6) {
- Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
- if (m != null) {
- assertEquals(expectedStreamSeq, m.metaData().streamSequence());
- assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence());
- ++expectedStreamSeq;
+ @Test
+ public void testOrderedBehaviorFetch() throws Exception {
+ runInJsServer(this::atLeast291, nc -> {
+ // Setup
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ // Get this in place before subscriptions are made
+ ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new;
+
+ CreateStreamResult csr = createMemoryStream(jsm);
+ StreamContext sc = js.getStreamContext(csr.stream);
+ jsPublish(js, csr.subject, 101, 6);
+ OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject);
+ OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
+ try (FetchConsumer fcon = ctx.fetchMessages(6)) {
+ // Loop through the messages to make sure I get stream sequence 1 to 6
+ int expectedStreamSeq = 1;
+ while (expectedStreamSeq <= 6) {
+ Message m = fcon.nextMessage();
+ if (m != null) {
+ assertEquals(expectedStreamSeq, m.metaData().streamSequence());
+ assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence());
+ ++expectedStreamSeq;
+ }
}
}
- }
+ });
+ }
+
+ @Test
+ public void testOrderedBehaviorIterable() throws Exception {
+ runInJsServer(this::atLeast291, nc -> {
+ // Setup
+ JetStream js = nc.jetStream();
+ JetStreamManagement jsm = nc.jetStreamManagement();
+
+ // Get this in place before subscriptions are made
+ ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new;
+
+ CreateStreamResult csr = createMemoryStream(jsm);
+ StreamContext sc = js.getStreamContext(csr.stream);
+ jsPublish(js, csr.subject, 101, 6);
+ OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject);
+ OrderedConsumerContext ctx = sc.createOrderedConsumer(occ);
+ try (IterableConsumer icon = ctx.iterate()) {
+ // Loop through the messages to make sure I get stream sequence 1 to 6
+ int expectedStreamSeq = 1;
+ while (expectedStreamSeq <= 6) {
+ Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
+ if (m != null) {
+ assertEquals(expectedStreamSeq, m.metaData().streamSequence());
+ assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence());
+ ++expectedStreamSeq;
+ }
+ }
+ }
+ });
}
@Test
diff --git a/src/test/java/io/nats/client/support/JsonParsingTests.java b/src/test/java/io/nats/client/support/JsonParsingTests.java
index 38e50df61..9f694b8d0 100644
--- a/src/test/java/io/nats/client/support/JsonParsingTests.java
+++ b/src/test/java/io/nats/client/support/JsonParsingTests.java
@@ -64,8 +64,8 @@ public void testStringParsing() {
addField(key(x++), HAS_SPACE, oMap, list, encodeds, decodeds);
addField(key(x++), HAS_PRINTABLE, oMap, list, encodeds, decodeds);
addField(key(x++), HAS_DOT, oMap, list, encodeds, decodeds);
- addField(key(x++), HAS_STAR, oMap, list, encodeds, decodeds);
- addField(key(x++), HAS_GT, oMap, list, encodeds, decodeds);
+ addField(key(x++), STAR_NOT_SEGMENT, oMap, list, encodeds, decodeds);
+ addField(key(x++), GT_NOT_SEGMENT, oMap, list, encodeds, decodeds);
addField(key(x++), HAS_DASH, oMap, list, encodeds, decodeds);
addField(key(x++), HAS_UNDER, oMap, list, encodeds, decodeds);
addField(key(x++), HAS_DOLLAR, oMap, list, encodeds, decodeds);
diff --git a/src/test/java/io/nats/client/support/ValidatorTests.java b/src/test/java/io/nats/client/support/ValidatorTests.java
index 5d6aedcb9..eddfe602e 100644
--- a/src/test/java/io/nats/client/support/ValidatorTests.java
+++ b/src/test/java/io/nats/client/support/ValidatorTests.java
@@ -35,19 +35,26 @@ public static void beforeAll() {
@Test
public void testValidateSubject() {
- allowedRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_STAR, HAS_GT, HAS_DOLLAR));
- notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_LOW, HAS_127));
- notAllowedRequired(Validator::validateSubject, UTF_ONLY_STRINGS);
- allowedNotRequiredEmptyAsNull(Validator::validateSubject, Arrays.asList(null, EMPTY));
-
- notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_LOW, HAS_127));
+ // subject is required
+ allowedRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR, HAS_LOW, HAS_127));
+ allowedRequired(Validator::validateSubject, UTF_ONLY_STRINGS);
+ allowedRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT));
+ notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_CR, HAS_LF));
+ notAllowedRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT));
+
+ // subject not required, null and empty both mean not supplied
allowedNotRequiredEmptyAsNull(Validator::validateSubject, Arrays.asList(null, EMPTY));
+ allowedNotRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR, HAS_LOW, HAS_127));
+ allowedNotRequired(Validator::validateSubject, UTF_ONLY_STRINGS);
+ allowedNotRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT));
+ notAllowedNotRequired(Validator::validateSubject, Arrays.asList(HAS_SPACE, HAS_CR, HAS_LF));
+ notAllowedNotRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT));
}
@Test
public void testValidateReplyTo() {
allowedRequired(Validator::validateReplyTo, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR));
- notAllowedRequired(Validator::validateReplyTo, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_STAR, HAS_GT, HAS_LOW, HAS_127));
+ notAllowedRequired(Validator::validateReplyTo, Arrays.asList(null, EMPTY, HAS_SPACE, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127));
notAllowedRequired(Validator::validateReplyTo, UTF_ONLY_STRINGS);
allowedNotRequiredEmptyAsNull(Validator::validateReplyTo, Arrays.asList(null, EMPTY));
}
@@ -56,7 +63,7 @@ public void testValidateReplyTo() {
public void testValidateQueueName() {
// validateQueueName(String s, boolean required)
allowedRequired(Validator::validateQueueName, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR));
- notAllowedRequired(Validator::validateQueueName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127));
+ notAllowedRequired(Validator::validateQueueName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127));
notAllowedRequired(Validator::validateQueueName, UTF_ONLY_STRINGS);
allowedNotRequiredEmptyAsNull(Validator::validateQueueName, Arrays.asList(null, EMPTY));
}
@@ -64,7 +71,7 @@ public void testValidateQueueName() {
@Test
public void testValidateStreamName() {
allowedRequired(Validator::validateStreamName, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR));
- notAllowedRequired(Validator::validateStreamName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127));
+ notAllowedRequired(Validator::validateStreamName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127));
notAllowedRequired(Validator::validateStreamName, UTF_ONLY_STRINGS);
allowedNotRequiredEmptyAsNull(Validator::validateStreamName, Arrays.asList(null, EMPTY));
}
@@ -72,7 +79,7 @@ public void testValidateStreamName() {
@Test
public void testValidateDurable() {
allowedRequired(Validator::validateDurable, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR));
- notAllowedRequired(Validator::validateDurable, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127));
+ notAllowedRequired(Validator::validateDurable, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127));
notAllowedRequired(Validator::validateDurable, UTF_ONLY_STRINGS);
allowedNotRequiredEmptyAsNull(Validator::validateDurable, Arrays.asList(null, EMPTY));
}
@@ -82,8 +89,8 @@ public void testValidatePrintable() {
validatePrintable(PLAIN, "label", true);
validatePrintable(HAS_PRINTABLE, "label", true);
validatePrintable(HAS_DOT, "label", true);
- validatePrintable(HAS_STAR, "label", true);
- validatePrintable(HAS_GT, "label", true);
+ validatePrintable(STAR_NOT_SEGMENT, "label", true);
+ validatePrintable(GT_NOT_SEGMENT, "label", true);
validatePrintable(HAS_DASH, "label", true);
validatePrintable(HAS_UNDER, "label", true);
validatePrintable(HAS_DOLLAR, "label", true);
@@ -229,8 +236,8 @@ public void testValidateBucketName() {
assertThrows(IllegalArgumentException.class, () -> validateBucketName(null, true));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_SPACE, true));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOT, true));
- assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_STAR, true));
- assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_GT, true));
+ assertThrows(IllegalArgumentException.class, () -> validateBucketName(STAR_NOT_SEGMENT, true));
+ assertThrows(IllegalArgumentException.class, () -> validateBucketName(GT_NOT_SEGMENT, true));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOLLAR, true));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_LOW, true));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_127, true));
@@ -246,8 +253,8 @@ public void testValidateBucketName() {
validateBucketName(null, false);
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_SPACE, false));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOT, false));
- assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_STAR, false));
- assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_GT, false));
+ assertThrows(IllegalArgumentException.class, () -> validateBucketName(STAR_NOT_SEGMENT, false));
+ assertThrows(IllegalArgumentException.class, () -> validateBucketName(GT_NOT_SEGMENT, false));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOLLAR, false));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_LOW, false));
assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_127, false));
@@ -265,8 +272,8 @@ public void testValidateWildcardKeyRequired() {
validateKvKeyWildcardAllowedRequired(HAS_FWD_SLASH);
validateKvKeyWildcardAllowedRequired(HAS_EQUALS);
validateKvKeyWildcardAllowedRequired(HAS_DOT);
- validateKvKeyWildcardAllowedRequired(HAS_STAR);
- validateKvKeyWildcardAllowedRequired(HAS_GT);
+ validateKvKeyWildcardAllowedRequired(STAR_NOT_SEGMENT);
+ validateKvKeyWildcardAllowedRequired(GT_NOT_SEGMENT);
validateKvKeyWildcardAllowedRequired("numbers9ok");
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(null));
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_SPACE));
@@ -290,8 +297,8 @@ public void testValidateNonWildcardKeyRequired() {
validateNonWildcardKvKeyRequired("numbers9ok");
assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(null));
assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_SPACE));
- assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_STAR));
- assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_GT));
+ assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(STAR_NOT_SEGMENT));
+ assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(GT_NOT_SEGMENT));
assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_127));
@@ -413,15 +420,27 @@ private void allowedRequired(StringTest test, List strings) {
}
}
+ private void notAllowedRequired(StringTest test, List strings) {
+ for (String s : strings) {
+ assertThrows(IllegalArgumentException.class, () -> test.validate(s, true), notAllowedMessage(s));
+ }
+ }
+
+ private void allowedNotRequired(StringTest test, List strings) {
+ for (String s : strings) {
+ assertEquals(s, test.validate(s, false), allowedMessage(s));
+ }
+ }
+
private void allowedNotRequiredEmptyAsNull(StringTest test, List strings) {
for (String s : strings) {
assertNull(test.validate(s, false), allowedMessage(s));
}
}
- private void notAllowedRequired(StringTest test, List strings) {
+ private void notAllowedNotRequired(StringTest test, List strings) {
for (String s : strings) {
- assertThrows(IllegalArgumentException.class, () -> test.validate(s, true), notAllowedMessage(s));
+ assertThrows(IllegalArgumentException.class, () -> test.validate(s, false), notAllowedMessage(s));
}
}
@@ -526,29 +545,53 @@ public void testSemver() {
@Test
public void testListsAreEqual() {
List l1 = Arrays.asList("one", "two");
- List l2 = Arrays.asList("one", "two");
- List l3 = Collections.singletonList("three");
- List l4 = null;
+ List l2 = Arrays.asList("two", "one");
+ List l3 = Arrays.asList("one", "not");
+ List l4 = Collections.singletonList("three");
List l5 = null;
List l6 = new ArrayList<>();
- assertTrue(listsAreEqual(l1, l1, true));
- assertTrue(listsAreEqual(l1, l1, false));
- assertTrue(listsAreEqual(l1, l2, true));
- assertTrue(listsAreEqual(l1, l2, false));
- assertFalse(listsAreEqual(l1, l3, true));
- assertFalse(listsAreEqual(l1, l3, false));
-
- assertFalse(listsAreEqual(l4, l1, true));
- assertFalse(listsAreEqual(l4, l1, false));
-
- assertTrue(listsAreEqual(l4, l5, true));
- assertTrue(listsAreEqual(l4, l5, false));
- assertFalse(listsAreEqual(l4, l6, true));
- assertFalse(listsAreEqual(l4, l6, false));
-
- assertTrue(listsAreEqual(l6, l4, true));
- assertFalse(listsAreEqual(l6, l4, false));
+ assertTrue(listsAreEquivalent(l1, l1));
+ assertTrue(listsAreEquivalent(l1, l2));
+ assertFalse(listsAreEquivalent(l1, l3));
+ assertFalse(listsAreEquivalent(l1, l4));
+ assertFalse(listsAreEquivalent(l1, l5));
+ assertFalse(listsAreEquivalent(l1, l6));
+
+ assertTrue(listsAreEquivalent(l2, l1));
+ assertTrue(listsAreEquivalent(l2, l2));
+ assertFalse(listsAreEquivalent(l2, l3));
+ assertFalse(listsAreEquivalent(l2, l4));
+ assertFalse(listsAreEquivalent(l2, l5));
+ assertFalse(listsAreEquivalent(l2, l6));
+
+ assertFalse(listsAreEquivalent(l3, l1));
+ assertFalse(listsAreEquivalent(l3, l2));
+ assertTrue(listsAreEquivalent(l3, l3));
+ assertFalse(listsAreEquivalent(l3, l4));
+ assertFalse(listsAreEquivalent(l3, l5));
+ assertFalse(listsAreEquivalent(l3, l6));
+
+ assertFalse(listsAreEquivalent(l4, l1));
+ assertFalse(listsAreEquivalent(l4, l2));
+ assertFalse(listsAreEquivalent(l4, l3));
+ assertTrue(listsAreEquivalent(l4, l4));
+ assertFalse(listsAreEquivalent(l4, l5));
+ assertFalse(listsAreEquivalent(l4, l6));
+
+ assertFalse(listsAreEquivalent(l5, l1));
+ assertFalse(listsAreEquivalent(l5, l2));
+ assertFalse(listsAreEquivalent(l5, l3));
+ assertFalse(listsAreEquivalent(l5, l4));
+ assertTrue(listsAreEquivalent(l5, l5));
+ assertTrue(listsAreEquivalent(l5, l6));
+
+ assertFalse(listsAreEquivalent(l6, l1));
+ assertFalse(listsAreEquivalent(l6, l2));
+ assertFalse(listsAreEquivalent(l6, l3));
+ assertFalse(listsAreEquivalent(l6, l4));
+ assertTrue(listsAreEquivalent(l6, l5));
+ assertTrue(listsAreEquivalent(l6, l6));
}
@Test
@@ -562,37 +605,73 @@ public void testMapsAreEqual() {
m2.put("one", "1");
Map m3 = new HashMap<>();
- m3.put("three", "3");
- m3.put("two", "4");
+ m3.put("one", "1");
+ m3.put("two", "not");
Map m4 = new HashMap<>();
- m3.put("four", "4");
+ m4.put("one", "1");
+ m4.put("not", "not");
+
+ Map m5 = new HashMap<>();
+ m5.put("five", "5");
- Map m5 = null;
Map m6 = null;
Map m7 = new HashMap<>();
- assertTrue(mapsAreEqual(m1, m1, true));
- assertTrue(mapsAreEqual(m1, m1, false));
- assertTrue(mapsAreEqual(m1, m2, true));
- assertTrue(mapsAreEqual(m1, m2, false));
- assertFalse(mapsAreEqual(m1, m3, true));
- assertFalse(mapsAreEqual(m1, m3, false));
- assertFalse(mapsAreEqual(m1, m4, true));
- assertFalse(mapsAreEqual(m1, m4, false));
- assertFalse(mapsAreEqual(m1, m5, true));
- assertFalse(mapsAreEqual(m1, m5, false));
-
- assertFalse(mapsAreEqual(m5, m1, true));
- assertFalse(mapsAreEqual(m5, m1, false));
-
- assertTrue(mapsAreEqual(m5, m6, true));
- assertTrue(mapsAreEqual(m5, m6, false));
- assertFalse(mapsAreEqual(m5, m7, true));
- assertFalse(mapsAreEqual(m5, m7, false));
-
- assertTrue(mapsAreEqual(m7, m5, true));
- assertFalse(mapsAreEqual(m7, m5, false));
+ assertTrue(mapsAreEquivalent(m1, m1));
+ assertTrue(mapsAreEquivalent(m1, m2));
+ assertFalse(mapsAreEquivalent(m1, m3));
+ assertFalse(mapsAreEquivalent(m1, m4));
+ assertFalse(mapsAreEquivalent(m1, m5));
+ assertFalse(mapsAreEquivalent(m1, m6));
+ assertFalse(mapsAreEquivalent(m1, m7));
+
+ assertTrue(mapsAreEquivalent(m2, m1));
+ assertTrue(mapsAreEquivalent(m2, m2));
+ assertFalse(mapsAreEquivalent(m2, m3));
+ assertFalse(mapsAreEquivalent(m2, m4));
+ assertFalse(mapsAreEquivalent(m2, m5));
+ assertFalse(mapsAreEquivalent(m2, m6));
+ assertFalse(mapsAreEquivalent(m2, m7));
+
+ assertFalse(mapsAreEquivalent(m3, m1));
+ assertFalse(mapsAreEquivalent(m3, m2));
+ assertTrue(mapsAreEquivalent(m3, m3));
+ assertFalse(mapsAreEquivalent(m3, m4));
+ assertFalse(mapsAreEquivalent(m3, m5));
+ assertFalse(mapsAreEquivalent(m3, m6));
+ assertFalse(mapsAreEquivalent(m3, m7));
+
+ assertFalse(mapsAreEquivalent(m4, m1));
+ assertFalse(mapsAreEquivalent(m4, m2));
+ assertFalse(mapsAreEquivalent(m4, m3));
+ assertTrue(mapsAreEquivalent(m4, m4));
+ assertFalse(mapsAreEquivalent(m4, m5));
+ assertFalse(mapsAreEquivalent(m4, m6));
+ assertFalse(mapsAreEquivalent(m4, m7));
+
+ assertFalse(mapsAreEquivalent(m5, m1));
+ assertFalse(mapsAreEquivalent(m5, m2));
+ assertFalse(mapsAreEquivalent(m5, m3));
+ assertFalse(mapsAreEquivalent(m5, m4));
+ assertTrue(mapsAreEquivalent(m5, m5));
+ assertFalse(mapsAreEquivalent(m5, m6));
+ assertFalse(mapsAreEquivalent(m5, m7));
+
+ assertFalse(mapsAreEquivalent(m6, m1));
+ assertFalse(mapsAreEquivalent(m6, m2));
+ assertFalse(mapsAreEquivalent(m6, m3));
+ assertFalse(mapsAreEquivalent(m6, m4));
+ assertFalse(mapsAreEquivalent(m6, m5));
+ assertTrue(mapsAreEquivalent(m6, m6));
+ assertTrue(mapsAreEquivalent(m6, m7));
+
+ assertFalse(mapsAreEquivalent(m7, m1));
+ assertFalse(mapsAreEquivalent(m7, m2));
+ assertFalse(mapsAreEquivalent(m7, m3));
+ assertFalse(mapsAreEquivalent(m7, m5));
+ assertTrue(mapsAreEquivalent(m7, m6));
+ assertTrue(mapsAreEquivalent(m7, m7));
}
}
\ No newline at end of file
diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java
index 4fe2302d6..2143978b5 100644
--- a/src/test/java/io/nats/client/utils/TestBase.java
+++ b/src/test/java/io/nats/client/utils/TestBase.java
@@ -38,16 +38,22 @@
public class TestBase {
+ public static final String STAR_SEGMENT = "*.star.*.segment.*";
+ public static final String GT_LAST_SEGMENT = "gt.last.>";
+ public static final String STARTS_WITH_DOT = ".starts-with-dot";
+ public static final String STAR_NOT_SEGMENT = "star*not*segment";
+ public static final String GT_NOT_SEGMENT = "gt>not>segment";
+
public static final String PLAIN = "plain";
public static final String HAS_SPACE = "has space";
public static final String HAS_PRINTABLE = "has-print!able";
public static final String HAS_DOT = "has.dot";
- public static final String HAS_STAR = "has*star";
- public static final String HAS_GT = "has>gt";
public static final String HAS_DASH = "has-dash";
public static final String HAS_UNDER = "has_under";
public static final String HAS_DOLLAR = "has$dollar";
- public static final String HAS_LOW = "has\tlower\rthan\nspace";
+ public static final String HAS_CR = "has\rcr";
+ public static final String HAS_LF = "has\nlf";
+ public static final String HAS_LOW = "has" + (char)0 + "low";
public static final String HAS_127 = "has" + (char)127 + "127";
public static final String HAS_FWD_SLASH = "has/fwd/slash";
public static final String HAS_BACK_SLASH = "has\\back\\slash";
diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java
index db9b0a8b3..45494565a 100644
--- a/src/test/java/io/nats/service/ServiceTests.java
+++ b/src/test/java/io/nats/service/ServiceTests.java
@@ -537,8 +537,8 @@ public void testServiceBuilderConstruction() {
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_PRINTABLE));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOT));
- assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_STAR)); // invalid in the middle
- assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_GT)); // invalid in the middle
+ assertThrows(IllegalArgumentException.class, () -> Service.builder().name(STAR_NOT_SEGMENT)); // invalid in the middle
+ assertThrows(IllegalArgumentException.class, () -> Service.builder().name(GT_NOT_SEGMENT)); // invalid in the middle
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_127));
@@ -744,14 +744,14 @@ public void testEndpointConstruction() {
assertTrue(JsonUtils.mapEquals(metadata, e.getMetadata()));
assertThrows(IllegalArgumentException.class, () -> Endpoint.builder().build());
- // many names are bad
+ // many names are bad and is required
assertThrows(IllegalArgumentException.class, () -> new Endpoint((String) null));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(EMPTY));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_SPACE));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_PRINTABLE));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_DOT));
- assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_STAR)); // invalid in the middle
- assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_GT)); // invalid in the middle
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(STAR_NOT_SEGMENT)); // invalid in the middle
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(GT_NOT_SEGMENT)); // invalid in the middle
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_DOLLAR));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_LOW));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_127));
@@ -760,11 +760,13 @@ public void testEndpointConstruction() {
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_EQUALS));
assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_TIC));
- // fewer subjects are bad
+ // typical subjects are bad
assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_SPACE));
- assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_LOW));
- assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_127));
- assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, "foo.>.bar")); // gt is not last segment
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_CR));
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_LF));
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, STAR_NOT_SEGMENT));
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, GT_NOT_SEGMENT));
+ assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, STARTS_WITH_DOT));
}
@Test
@@ -847,11 +849,12 @@ public void testGroupConstruction() {
g1 = new Group("foo.*");
assertEquals("foo.*", g1.getName());
+ assertThrows(IllegalArgumentException.class, () -> new Group(null));
+ assertThrows(IllegalArgumentException.class, () -> new Group(EMPTY));
assertThrows(IllegalArgumentException.class, () -> new Group(HAS_SPACE));
- assertThrows(IllegalArgumentException.class, () -> new Group(HAS_LOW));
- assertThrows(IllegalArgumentException.class, () -> new Group(HAS_127));
- assertThrows(IllegalArgumentException.class, () -> new Group("foo.>")); // gt is last segment
- assertThrows(IllegalArgumentException.class, () -> new Group("foo.>.bar")); // gt is not last segment
+ assertThrows(IllegalArgumentException.class, () -> new Group(STAR_NOT_SEGMENT)); // invalid in the middle
+ assertThrows(IllegalArgumentException.class, () -> new Group("foo.>")); // GT invalid everywhere
+ assertThrows(IllegalArgumentException.class, () -> new Group(GT_NOT_SEGMENT)); // GT invalid everywhere
EqualsVerifier.simple().forClass(Group.class).withPrefabValues(Group.class, g1, g2).verify();
}