From b28125405a0ca412d4765a7191a370b18aa9575d Mon Sep 17 00:00:00 2001 From: scottf Date: Fri, 8 Sep 2023 13:31:10 -0400 Subject: [PATCH] applying new rules for subject valdiation --- src/main/java/io/nats/client/JetStream.java | 33 ++- .../client/api/ConsumerConfiguration.java | 6 +- .../io/nats/client/impl/NatsJetStream.java | 156 ++++++------- .../io/nats/client/support/Validator.java | 104 +++++---- src/main/java/io/nats/service/Endpoint.java | 2 +- src/main/java/io/nats/service/Group.java | 14 +- .../io/nats/client/JetStreamOptionsTests.java | 8 +- .../io/nats/client/SubscribeOptionsTests.java | 2 +- .../api/KeyValueConfigurationTests.java | 4 +- .../nats/client/api/ObjectStoreApiTests.java | 4 +- .../client/impl/JetStreamGeneralTests.java | 100 ++++---- .../nats/client/impl/JetStreamPullTests.java | 18 +- .../nats/client/impl/JetStreamTestBase.java | 6 +- .../nats/client/impl/SimplificationTests.java | 112 ++++----- .../nats/client/support/JsonParsingTests.java | 4 +- .../nats/client/support/ValidatorTests.java | 213 ++++++++++++------ .../java/io/nats/client/utils/TestBase.java | 12 +- .../java/io/nats/service/ServiceTests.java | 29 +-- 18 files changed, 466 insertions(+), 361 deletions(-) diff --git a/src/main/java/io/nats/client/JetStream.java b/src/main/java/io/nats/client/JetStream.java index b26d40d5e..2f2da8a3e 100644 --- a/src/main/java/io/nats/client/JetStream.java +++ b/src/main/java/io/nats/client/JetStream.java @@ -358,13 +358,13 @@ public interface JetStream { *

See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for * information about creating an asynchronous subscription with callbacks. * - * @param subject the subject to subscribe to + * @param subscribeSubject the subject to subscribe to * @return The subscription * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException; /** * Create a synchronous subscription to the specified subject. @@ -375,14 +375,14 @@ public interface JetStream { *

See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for * information about creating an asynchronous subscription with callbacks. * - * @param subject the subject to subscribe to + * @param subscribeSubject the subject to subscribe to * @param options optional subscription options * @return The subscription * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException; /** * Create a synchronous subscription to the specified subject. @@ -393,7 +393,7 @@ public interface JetStream { *

See {@link io.nats.client.Connection#createDispatcher(MessageHandler) createDispatcher} for * information about creating an asynchronous subscription with callbacks. * - * @param subject the subject to subscribe to + * @param subscribeSubject the subject to subscribe to * @param queue the optional queue group to join * @param options optional subscription options * @return The subscription @@ -401,14 +401,14 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException; /** * Create an asynchronous subscription to the specified subject under the control of the * specified dispatcher. Since a MessageHandler is also required, the Dispatcher will * not prevent duplicate subscriptions from being made. * - * @param subject The subject to subscribe to + * @param subscribeSubject The subject to subscribe to * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages * @param autoAck Whether to auto ack @@ -417,14 +417,13 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException; /** * Create an asynchronous subscription to the specified subject under the control of the * specified dispatcher. Since a MessageHandler is also required, the Dispatcher will * not prevent duplicate subscriptions from being made. - * - * @param subject The subject to subscribe to. + * @param subscribeSubject The subject to subscribe to. * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages * @param autoAck Whether to auto ack @@ -434,14 +433,14 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException; /** * Create an asynchronous subscription to the specified subject under the control of the * specified dispatcher. Since a MessageHandler is also required, the Dispatcher will * not prevent duplicate subscriptions from being made. * - * @param subject The subject to subscribe to. + * @param subscribeSubject The subject to subscribe to. * @param queue the optional queue group to join * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages @@ -452,22 +451,22 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException; /** * Create a subscription to the specified subject in the mode of pull, with additional options - * @param subject The subject to subscribe to + * @param subscribeSubject The subject to subscribe to * @param options pull subscription options * @return The subscription * @throws IOException covers various communication issues with the NATS * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException; /** * Create an asynchronous subscription to the specified subject in the mode of pull, with additional options - * @param subject The subject to subscribe to + * @param subscribeSubject The subject to subscribe to * @param dispatcher The dispatcher to handle this subscription * @param handler The target for the messages * @param options pull subscription options @@ -476,7 +475,7 @@ public interface JetStream { * server such as timeout or interruption * @throws JetStreamApiException the request had an error related to the data */ - JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException; + JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException; /** * Get a stream context for a specific named stream. Verifies that the stream exists. diff --git a/src/main/java/io/nats/client/api/ConsumerConfiguration.java b/src/main/java/io/nats/client/api/ConsumerConfiguration.java index 23223e111..8a16c76d1 100644 --- a/src/main/java/io/nats/client/api/ConsumerConfiguration.java +++ b/src/main/java/io/nats/client/api/ConsumerConfiguration.java @@ -852,7 +852,11 @@ public Builder maxDeliver(long maxDeliver) { * @return Builder */ public Builder filterSubject(String filterSubject) { - return filterSubjects(Collections.singletonList(filterSubject)); + this.filterSubjects.clear(); + if (!nullOrEmpty(filterSubject)) { + this.filterSubjects.add(filterSubject); + } + return this; } diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index bafb1c1ec..e85e1adfa 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -234,22 +234,21 @@ MessageManager createMessageManager( MessageManagerFactory _pullOrderedMessageManagerFactory = (mmConn, mmJs, mmStream, mmSo, mmCc, mmQueueMode, mmSyncMode) -> new OrderedPullMessageManager(mmConn, mmJs, mmStream, mmSo, mmCc, mmSyncMode); - JetStreamSubscription createSubscription(String subscribeSubject, + JetStreamSubscription createSubscription(String userSubscribeSubject, + PushSubscribeOptions pushSubscribeOptions, + PullSubscribeOptions pullSubscribeOptions, String queueName, NatsDispatcher dispatcher, MessageHandler userHandler, - boolean isAutoAck, - PushSubscribeOptions pushSubscribeOptions, - PullSubscribeOptions pullSubscribeOptions + boolean isAutoAck ) throws IOException, JetStreamApiException { - // 0. parameter notes. For those relating to the callers, you can see all the callers further down in this source file. - // - subscribeSubject will have been resolved by the caller from their method parameter or the consumer config + // Parameter notes. For those relating to the callers, you can see all the callers further down in this source file. // - pull subscribe callers guarantee that pullSubscribeOptions is not null // - qgroup is always null with pull callers // - callers only ever provide one of the subscribe options - // 1. Prepare for all the validation + // 1. Initial prep and validation boolean isPullMode = pullSubscribeOptions != null; SubscribeOptions so; @@ -290,7 +289,7 @@ JetStreamSubscription createSubscription(String subscribeSubject, } } - // 2A. Flow Control / heartbeat not always valid + // 1B. Flow Control / heartbeat not always valid if (userCC.getIdleHeartbeat() != null && userCC.getIdleHeartbeat().toMillis() > 0) { if (isPullMode) { throw JsSubFcHbNotValidPull.instance(); @@ -300,10 +299,31 @@ JetStreamSubscription createSubscription(String subscribeSubject, } } - // 2B. Did they tell me what stream? No? look it up. + // 2. figure out user provided subjects and prepare the userCcFilterSubjects + userSubscribeSubject = emptyAsNull(userSubscribeSubject); + List userCcFilterSubjects = new ArrayList<>(); + if (userCC.getFilterSubject() == null) { // empty filterSubjects gives gives null + // userCC.filterSubjects empty, populate userCcFilterSubjects w/userSubscribeSubject if possible + if (userSubscribeSubject != null) { + userCcFilterSubjects.add(userSubscribeSubject); + } + } + else { + // userCC.filterSubjects not empty, validate them + userCcFilterSubjects.addAll(userCC.getFilterSubjects()); + // If userSubscribeSubject is provided it must be one of the filter subjects. + if (userSubscribeSubject != null && !userCcFilterSubjects.contains(userSubscribeSubject)) { + throw JsSubSubjectDoesNotMatchFilter.instance(); + } + } + + // 3. Did they tell me what stream? No? look it up. final String settledStream; if (stream == null) { - settledStream = lookupStreamBySubject(subscribeSubject); + if (userCcFilterSubjects.isEmpty()) { + throw new IllegalArgumentException("Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject."); + } + settledStream = lookupStreamBySubject(userCcFilterSubjects.get(0)); if (settledStream == null) { throw JsSubNoMatchingStreamForSubject.instance(); } @@ -319,7 +339,8 @@ JetStreamSubscription createSubscription(String subscribeSubject, } String inboxDeliver = userCC.getDeliverSubject(); - // 3. Does this consumer already exist? FastBind bypasses the lookup; the dev better know what they are doing + // 4. Does this consumer already exist? FastBind bypasses the lookup; + // the dev better know what they are doing... if (!so.isFastBind() && consumerName != null) { ConsumerInfo serverInfo = lookupConsumerInfo(settledStream, consumerName); @@ -364,11 +385,14 @@ else if (!serverCC.getDeliverGroup().equals(deliverGroup)) { throw JsSubExistingQueueDoesNotMatchRequestedQueue.instance(); } - // durable already exists, make sure the filter subject matches - if (nullOrEmpty(subscribeSubject)) { // allowed if they had given both stream and consumer name - subscribeSubject = serverCC.getFilterSubject(); + // consumer already exists, make sure the filter subject matches + // subscribeSubject, if supplied came from the user directly + // or in the userCC or might not have been in either place + if (userCcFilterSubjects.isEmpty()) { + // still also might be null, which the server treats as > + userCcFilterSubjects = serverCC.getFilterSubjects(); } - else if (!isFilterMatch(subscribeSubject, serverCC.getFilterSubject(), settledStream)) { + else if (!listsAreEquivalent(userCcFilterSubjects, serverCC.getFilterSubjects())) { throw JsSubSubjectDoesNotMatchFilter.instance(); } @@ -379,7 +403,7 @@ else if (so.isBind()) { } } - // 4. If pull or no deliver subject (inbox) provided or found, make an inbox. + // 5. If pull or no deliver subject (inbox) provided or found, make an inbox. final String settledInboxDeliver; if (isPullMode) { settledInboxDeliver = conn.createInbox() + ".*"; @@ -391,13 +415,13 @@ else if (inboxDeliver == null) { settledInboxDeliver = inboxDeliver; } - // 5. If consumer does not exist, create and settle on the config. Name will have to wait + // 6. If consumer does not exist, create and settle on the config. Name will have to wait // If the consumer exists, I know what the settled info is final String settledConsumerName; final ConsumerConfiguration settledCC; if (so.isFastBind() || serverCC != null) { settledCC = serverCC; - settledConsumerName = so.getName(); + settledConsumerName = so.getName(); // will never be null in this case } else { ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC); @@ -407,19 +431,18 @@ else if (inboxDeliver == null) { ccBuilder.deliverSubject(settledInboxDeliver); } - if (userCC.getFilterSubject() == null) { - // this would happen if they use the api subscribe subject but didn't supply a - // consumer config at all or their consumer config did not specify any filter subject - ccBuilder.filterSubject(subscribeSubject); - } + // userCC.filterSubjects might have originally been empty + // but there might have been a userSubscribeSubject, + // so this makes sure it's resolved either way + ccBuilder.filterSubjects(userCcFilterSubjects); ccBuilder.deliverGroup(deliverGroup); settledCC = ccBuilder.build(); - settledConsumerName = null; + settledConsumerName = null; // the server will give us a name } - // 6. create the subscription. lambda needs final or effectively final vars + // 7. create the subscription. lambda needs final or effectively final vars final MessageManager mm; final NatsSubscriptionFactory subFactory; if (isPullMode) { @@ -449,7 +472,7 @@ else if (inboxDeliver == null) { sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(settledInboxDeliver, deliverGroup, handler, subFactory); } - // 7. The consumer might need to be created, do it here + // 8. The consumer might need to be created, do it here if (settledConsumerName == null) { _createConsumerUnsubscribeOnException(settledStream, settledCC, sub); } @@ -491,14 +514,14 @@ public List getChanges(ConsumerConfiguration serverCc) { if (startTime != null && !startTime.equals(serverCcc.startTime)) { changes.add("startTime"); } - if (!filterSubjects.isEmpty() && !notNullListsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); } + if (!filterSubjects.isEmpty() && !listsAreEquivalent(filterSubjects, serverCcc.filterSubjects)) { changes.add("filterSubjects"); } if (description != null && !description.equals(serverCcc.description)) { changes.add("description"); } if (sampleFrequency != null && !sampleFrequency.equals(serverCcc.sampleFrequency)) { changes.add("sampleFrequency"); } if (deliverSubject != null && !deliverSubject.equals(serverCcc.deliverSubject)) { changes.add("deliverSubject"); } if (deliverGroup != null && !deliverGroup.equals(serverCcc.deliverGroup)) { changes.add("deliverGroup"); } - if (backoff != null && !listsAreEqual(backoff, serverCcc.backoff, true)) { changes.add("backoff"); } - if (metadata != null && !mapsAreEqual(metadata, serverCcc.metadata, true)) { changes.add("metadata"); } + if (backoff != null && !listsAreEquivalent(backoff, serverCcc.backoff)) { changes.add("backoff"); } + if (metadata != null && !mapsAreEquivalent(metadata, serverCcc.metadata)) { changes.add("metadata"); } // do not need to check Durable because the original is retrieved by the durable name @@ -528,24 +551,6 @@ public void onMessage(Message msg) throws InterruptedException { } } - private boolean isFilterMatch(String subscribeSubject, String filterSubject, String stream) throws IOException, JetStreamApiException { - - // subscribeSubject guaranteed to not be null - // filterSubject may be null or empty or have value - - if (subscribeSubject.equals(filterSubject)) { - return true; - } - - if (nullOrEmpty(filterSubject) || filterSubject.equals(">")) { - // lookup stream subject returns null if there is not exactly one subject - String streamSubject = lookupStreamSubject(stream); - return subscribeSubject.equals(streamSubject); - } - - return false; - } - private String lookupStreamSubject(String stream) throws IOException, JetStreamApiException { StreamInfo si = _getStreamInfo(stream, null); List streamSubjects = si.getConfiguration().getSubjects(); @@ -556,101 +561,76 @@ private String lookupStreamSubject(String stream) throws IOException, JetStreamA * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException { - subject = validateSubject(subject, true); - return createSubscription(subject, null, null, null, false, null, null); + public JetStreamSubscription subscribe(String subscribeSubject) throws IOException, JetStreamApiException { + return createSubscription(subscribeSubject, null, null, null, null, null, false); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); - return createSubscription(subject, null, null, null, false, options, null); + public JetStreamSubscription subscribe(String subscribeSubject, PushSubscribeOptions options) throws IOException, JetStreamApiException { + return createSubscription(subscribeSubject, options, null, null, null, null, false); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); + public JetStreamSubscription subscribe(String subscribeSubject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException { queue = validateQueueName(emptyAsNull(queue), false); - return createSubscription(subject, queue, null, null, false, options, null); + return createSubscription(subscribeSubject, options, null, queue, null, null, false); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException { - subject = validateSubject(subject, true); + public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException { validateNotNull(dispatcher, "Dispatcher"); validateNotNull(handler, "Handler"); - return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, null, null); + return createSubscription(subscribeSubject, null, null, null, (NatsDispatcher) dispatcher, handler, autoAck); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); + public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException { validateNotNull(dispatcher, "Dispatcher"); validateNotNull(handler, "Handler"); - return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, autoAck, options, null); + return createSubscription(subscribeSubject, options, null, null, (NatsDispatcher) dispatcher, handler, autoAck); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); + public JetStreamSubscription subscribe(String subscribeSubject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException { queue = validateQueueName(emptyAsNull(queue), false); validateNotNull(dispatcher, "Dispatcher"); validateNotNull(handler, "Handler"); - return createSubscription(subject, queue, (NatsDispatcher) dispatcher, handler, autoAck, options, null); + return createSubscription(subscribeSubject, options, null, queue, (NatsDispatcher) dispatcher, handler, autoAck); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); + public JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException { validateNotNull(options, "Pull Subscribe Options"); - return createSubscription(subject, null, null, null, false, null, options); + return createSubscription(subscribeSubject, null, options, null, null, null, false); } /** * {@inheritDoc} */ @Override - public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException { - subject = extractAndValidateSubject(subject, options); + public JetStreamSubscription subscribe(String subscribeSubject, Dispatcher dispatcher, MessageHandler handler, PullSubscribeOptions options) throws IOException, JetStreamApiException { validateNotNull(dispatcher, "Dispatcher"); validateNotNull(handler, "Handler"); validateNotNull(options, "Pull Subscribe Options"); - return createSubscription(subject, null, (NatsDispatcher) dispatcher, handler, false, null, options); - } - - private String extractAndValidateSubject(String supplied, SubscribeOptions options) { - if (supplied != null) { - return validateSubject(supplied, isSubjectRequired(options)); - } - if (options != null) { - ConsumerConfiguration cc = options.getConsumerConfiguration(); - if (cc != null) { - return validateSubject(cc.getFilterSubject(), isSubjectRequired(options)); - } - } - return validateSubject(null, isSubjectRequired(options)); - } - - private boolean isSubjectRequired(SubscribeOptions options) { - return options == null || !options.isBind(); + return createSubscription(subscribeSubject, null, options, null, (NatsDispatcher) dispatcher, handler, false); } /** diff --git a/src/main/java/io/nats/client/support/Validator.java b/src/main/java/io/nats/client/support/Validator.java index 0544cf807..7d4d4bbbe 100644 --- a/src/main/java/io/nats/client/support/Validator.java +++ b/src/main/java/io/nats/client/support/Validator.java @@ -27,26 +27,57 @@ private Validator() { } /* ensures cannot be constructed */ public static String validateSubject(String s, boolean required) { - return validateSubject(s, "Subject", required, false); + return validateSubject(s, "Subject", required); } public static String validateSubject(String subject, String label, boolean required, boolean cantEndWithGt) { + subject = validateSubject(subject, label, required); + if (cantEndWithGt && subject.endsWith(".>")) { + throw new IllegalArgumentException(label + " last segment cannot be '>'"); + } + return subject; + } + + /* + cannot contain spaces \r \n \t + cannot start or with subject token delimiter . + some things don't allow it to end greater + */ + public static String validateSubject(String subject, String label, boolean required) { if (emptyAsNull(subject) == null) { if (required) { throw new IllegalArgumentException(label + " cannot be null or empty."); } return null; } + + subject = subject.trim(); String[] segments = subject.split("\\."); - for (int x = 0; x < segments.length; x++) { - String segment = segments[x]; - if (segment.equals(">")) { - if (cantEndWithGt || x != segments.length - 1) { // if it can end with gt, gt must be last segment - throw new IllegalArgumentException(label + " cannot contain '>'"); + for (int seg = 0; seg < segments.length; seg++) { + String segment = segments[seg]; + int sl = segment.length(); + if (sl == 0) { + if (seg == 0) { + throw new IllegalArgumentException(label + " cannot start with '.'"); + } + throw new IllegalArgumentException(label + " segment cannot be empty"); + } + else { + for (int m = 0; m < sl; m++) { + char c = segment.charAt(m); + switch (c) { + case 32: + case '\r': + case '\n': + throw new IllegalArgumentException(label + " cannot contain space, carriage return or linefeed character"); + case '*': + case '>': + if (sl != 1) { + throw new IllegalArgumentException(label + " wildcard improperly placed."); + } + break; + } } - } - else if (!segment.equals("*") && notPrintable(segment)) { - throw new IllegalArgumentException(label + " must be printable characters only."); } } return subject; @@ -561,57 +592,40 @@ public static boolean isSemVer(String s) { return SEMVER_PATTERN.matcher(s).find(); } - public static boolean listsAreEqual(List l1, List l2, boolean nullSecondEqualsEmptyFirst) + public static boolean listsAreEquivalent(List l1, List l2) { - if (l1 == null) - { - return l2 == null; - } - - if (l2 == null) - { - return nullSecondEqualsEmptyFirst && l1.isEmpty(); - } + int s1 = l1 == null ? 0 : l1.size(); + int s2 = l2 == null ? 0 : l2.size(); - return l1.equals(l2); - } - - public static boolean notNullListsAreEquivalent(List l1, List l2) - { - if (l1.size() != l2.size()) { + if (s1 != s2) { return false; } - for (T t : l1) { - if (!l2.contains(t)) { - return false; + if (s1 > 0) { + for (T t : l1) { + if (!l2.contains(t)) { + return false; + } } } - return true; } - - public static boolean mapsAreEqual(Map m1, Map m2, boolean nullSecondEqualsEmptyFirst) + public static boolean mapsAreEquivalent(Map m1, Map m2) { - if (m1 == null) - { - return m2 == null; - } + int s1 = m1 == null ? 0 : m1.size(); + int s2 = m2 == null ? 0 : m2.size(); - if (m2 == null) - { - return nullSecondEqualsEmptyFirst && m1.isEmpty(); - } - - if (m1.size() != m2.size()) { + if (s1 != s2) { return false; } - for (Map.Entry entry : m1.entrySet()) - { - if (!entry.getValue().equals(m2.get(entry.getKey()))) { - return false; + if (s1 > 0) { + for (Map.Entry entry : m1.entrySet()) + { + if (!entry.getValue().equals(m2.get(entry.getKey()))) { + return false; + } } } diff --git a/src/main/java/io/nats/service/Endpoint.java b/src/main/java/io/nats/service/Endpoint.java index 339d7aaab..cbbf3d995 100644 --- a/src/main/java/io/nats/service/Endpoint.java +++ b/src/main/java/io/nats/service/Endpoint.java @@ -85,7 +85,7 @@ public Endpoint(String name, String subject, Map metadata) { this.subject = this.name; } else { - this.subject = validateSubject(subject, "Endpoint Subject", false, false); + this.subject = validateSubject(subject, "Endpoint Subject", false); } } else { diff --git a/src/main/java/io/nats/service/Group.java b/src/main/java/io/nats/service/Group.java index c6d65ac39..0c692e067 100644 --- a/src/main/java/io/nats/service/Group.java +++ b/src/main/java/io/nats/service/Group.java @@ -16,6 +16,7 @@ import java.util.Objects; import static io.nats.client.support.NatsConstants.DOT; +import static io.nats.client.support.Validator.emptyAsNull; import static io.nats.client.support.Validator.validateSubject; /** @@ -27,11 +28,20 @@ public class Group { /** * Construct a group. - *

Group names and subjects are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'

+ *

Group names are considered 'Restricted Terms' and must only contain A-Z, a-z, 0-9, '-' or '_'

* @param name the group name */ public Group(String name) { - this.name = validateSubject(name, "Group Name", true, true); + name = emptyAsNull(name); + if (name == null) { + throw new IllegalArgumentException("Group name cannot be null or empty."); + } + + if (name.contains(">")) { + throw new IllegalArgumentException("Group name cannot contain '>'."); + } + + this.name = validateSubject(name, "Group name", false); } /** diff --git a/src/test/java/io/nats/client/JetStreamOptionsTests.java b/src/test/java/io/nats/client/JetStreamOptionsTests.java index 0d248913a..b1388d783 100644 --- a/src/test/java/io/nats/client/JetStreamOptionsTests.java +++ b/src/test/java/io/nats/client/JetStreamOptionsTests.java @@ -103,8 +103,8 @@ public void testPrefixValidation() { assertValidPrefix(HAS_TIC); assertInvalidPrefix(HAS_SPACE); - assertInvalidPrefix(HAS_STAR); - assertInvalidPrefix(HAS_GT); + assertInvalidPrefix(STAR_NOT_SEGMENT); + assertInvalidPrefix(GT_NOT_SEGMENT); assertInvalidPrefix(HAS_LOW); assertInvalidPrefix(HAS_127); @@ -145,8 +145,8 @@ public void testDomainValidation() { assertValidDomain(HAS_TIC); assertInvalidDomain(HAS_SPACE); - assertInvalidDomain(HAS_STAR); - assertInvalidDomain(HAS_GT); + assertInvalidDomain(STAR_NOT_SEGMENT); + assertInvalidDomain(GT_NOT_SEGMENT); assertInvalidDomain(HAS_LOW); assertInvalidDomain(HAS_127); diff --git a/src/test/java/io/nats/client/SubscribeOptionsTests.java b/src/test/java/io/nats/client/SubscribeOptionsTests.java index c1b27e9f2..3ca4b61a8 100644 --- a/src/test/java/io/nats/client/SubscribeOptionsTests.java +++ b/src/test/java/io/nats/client/SubscribeOptionsTests.java @@ -24,7 +24,7 @@ import static org.junit.jupiter.api.Assertions.*; public class SubscribeOptionsTests extends TestBase { - private static final String[] badNames = {HAS_DOT, HAS_GT, HAS_STAR, HAS_FWD_SLASH, HAS_BACK_SLASH}; + private static final String[] badNames = {HAS_DOT, GT_NOT_SEGMENT, STAR_NOT_SEGMENT, HAS_FWD_SLASH, HAS_BACK_SLASH}; @Test public void testPushAffirmative() { diff --git a/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java b/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java index d1cfb4eb3..b5f6f6090 100644 --- a/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java +++ b/src/test/java/io/nats/client/api/KeyValueConfigurationTests.java @@ -84,8 +84,8 @@ public void testConstructionInvalidsCoverage() { assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_SPACE)); assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_PRINTABLE)); assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_DOT)); - assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_STAR)); - assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_GT)); + assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(STAR_NOT_SEGMENT)); + assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(GT_NOT_SEGMENT)); assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_DOLLAR)); assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().name(HAS_LOW)); assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder(HAS_127)); diff --git a/src/test/java/io/nats/client/api/ObjectStoreApiTests.java b/src/test/java/io/nats/client/api/ObjectStoreApiTests.java index 16836e8c8..18d1d7fb0 100644 --- a/src/test/java/io/nats/client/api/ObjectStoreApiTests.java +++ b/src/test/java/io/nats/client/api/ObjectStoreApiTests.java @@ -329,8 +329,8 @@ public void testConstructionInvalidsCoverage() { assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_SPACE)); assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_PRINTABLE)); assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_DOT)); - assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_STAR)); - assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_GT)); + assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(STAR_NOT_SEGMENT)); + assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(GT_NOT_SEGMENT)); assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_DOLLAR)); assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder().name(HAS_LOW)); assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.builder(HAS_127)); diff --git a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java index ecd615096..cdc21c050 100644 --- a/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamGeneralTests.java @@ -277,8 +277,6 @@ public void testJetStreamSubscribeErrors() throws Exception { // subject IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(null)); assertTrue(iae.getMessage().startsWith("Subject")); - iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(HAS_SPACE)); - assertTrue(iae.getMessage().startsWith("Subject")); iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(null, (PushSubscribeOptions)null)); assertTrue(iae.getMessage().startsWith("Subject")); @@ -347,7 +345,7 @@ public void testFilterSubjectEphemeral() throws Exception { // subscribe to A cc = builder().filterSubject(subjectA).ackPolicy(AckPolicy.None).build(); pso = PushSubscribeOptions.builder().configuration(cc).build(); - sub = js.subscribe(subjectWild, pso); + sub = js.subscribe(subjectA, pso); nc.flush(Duration.ofSeconds(1)); m = sub.nextMessage(Duration.ofSeconds(1)); @@ -362,7 +360,7 @@ public void testFilterSubjectEphemeral() throws Exception { // subscribe to B cc = builder().filterSubject(subjectB).ackPolicy(AckPolicy.None).build(); pso = PushSubscribeOptions.builder().configuration(cc).build(); - sub = js.subscribe(subjectWild, pso); + sub = js.subscribe(subjectB, pso); nc.flush(Duration.ofSeconds(1)); m = sub.nextMessage(Duration.ofSeconds(1)); @@ -567,80 +565,74 @@ public void testFilterMismatchErrors() throws Exception { createMemoryStream(jsm, STREAM, SUBJECT); // will work as SubscribeSubject equals Filter Subject - subscribeOk(js, jsm, SUBJECT, SUBJECT); - subscribeOk(js, jsm, ">", ">"); - subscribeOk(js, jsm, "*", "*"); - - // will work as SubscribeSubject != empty Filter Subject, - // b/c Stream has exactly 1 subject and is a match. - subscribeOk(js, jsm, "", SUBJECT); - - // will work as SubscribeSubject != Filter Subject of '>' - // b/c Stream has exactly 1 subject and is a match. - subscribeOk(js, jsm, ">", SUBJECT); + filterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT); + filterMatchSubscribeOk(js, jsm, ">", ">"); + filterMatchSubscribeOk(js, jsm, "*", "*"); // will not work - subscribeEx(js, jsm, "*", SUBJECT); + filterMatchSubscribeEx(js, jsm, SUBJECT, ""); + filterMatchSubscribeEx(js, jsm, SUBJECT, ">"); + filterMatchSubscribeEx(js, jsm, SUBJECT, "*"); // multiple subjects no wildcards jsm.deleteStream(STREAM); createMemoryStream(jsm, STREAM, SUBJECT, subject(2)); // will work as SubscribeSubject equals Filter Subject - subscribeOk(js, jsm, SUBJECT, SUBJECT); - subscribeOk(js, jsm, ">", ">"); - subscribeOk(js, jsm, "*", "*"); + filterMatchSubscribeOk(js, jsm, SUBJECT, SUBJECT); + filterMatchSubscribeOk(js, jsm, ">", ">"); + filterMatchSubscribeOk(js, jsm, "*", "*"); // will not work because stream has more than 1 subject - subscribeEx(js, jsm, "", SUBJECT); - subscribeEx(js, jsm, ">", SUBJECT); - subscribeEx(js, jsm, "*", SUBJECT); + filterMatchSubscribeEx(js, jsm, SUBJECT, ""); + filterMatchSubscribeEx(js, jsm, SUBJECT, ">"); + filterMatchSubscribeEx(js, jsm, SUBJECT, "*"); // multiple subjects via '>' jsm.deleteStream(STREAM); createMemoryStream(jsm, STREAM, SUBJECT_GT); // will work, exact matches - subscribeOk(js, jsm, subjectDot("1"), subjectDot("1")); - subscribeOk(js, jsm, ">", ">"); + filterMatchSubscribeOk(js, jsm, subjectDot("1"), subjectDot("1")); + filterMatchSubscribeOk(js, jsm, ">", ">"); // will not work because mismatch / stream has more than 1 subject - subscribeEx(js, jsm, "", subjectDot("1")); - subscribeEx(js, jsm, ">", subjectDot("1")); - subscribeEx(js, jsm, SUBJECT_GT, subjectDot("1")); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), ""); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), ">"); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), SUBJECT_GT); // multiple subjects via '*' jsm.deleteStream(STREAM); createMemoryStream(jsm, STREAM, SUBJECT_STAR); // will work, exact matches - subscribeOk(js, jsm, subjectDot("1"), subjectDot("1")); - subscribeOk(js, jsm, ">", ">"); + filterMatchSubscribeOk(js, jsm, subjectDot("1"), subjectDot("1")); + filterMatchSubscribeOk(js, jsm, ">", ">"); // will not work because mismatch / stream has more than 1 subject - subscribeEx(js, jsm, "", subjectDot("1")); - subscribeEx(js, jsm, ">", subjectDot("1")); - subscribeEx(js, jsm, SUBJECT_STAR, subjectDot("1")); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), ""); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), ">"); + filterMatchSubscribeEx(js, jsm, subjectDot("1"), SUBJECT_STAR); }); } - private void subscribeOk(JetStream js, JetStreamManagement jsm, String fs, String ss) throws IOException, JetStreamApiException { + private void filterMatchSubscribeOk(JetStream js, JetStreamManagement jsm, String subscribeSubject, String... filterSubjects) throws IOException, JetStreamApiException { int i = RandomUtils.PRAND.nextInt(); // just want a unique number - setupConsumer(jsm, i, fs); - unsubscribeEnsureNotBound(js.subscribe(ss, builder().durable(durable(i)).buildPushSubscribeOptions())); + filterMatchSetupConsumer(jsm, i, filterSubjects); + unsubscribeEnsureNotBound(js.subscribe(subscribeSubject, builder().durable(durable(i)).buildPushSubscribeOptions())); } - private void subscribeEx(JetStream js, JetStreamManagement jsm, String fs, String ss) throws IOException, JetStreamApiException { + private void filterMatchSubscribeEx(JetStream js, JetStreamManagement jsm, String subscribeSubject, String... filterSubjects) throws IOException, JetStreamApiException { int i = RandomUtils.PRAND.nextInt(); // just want a unique number - setupConsumer(jsm, i, fs); + filterMatchSetupConsumer(jsm, i, filterSubjects); IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, - () -> js.subscribe(ss, builder().durable(durable(i)).buildPushSubscribeOptions())); + () -> js.subscribe(subscribeSubject, builder().durable(durable(i)).buildPushSubscribeOptions())); assertTrue(iae.getMessage().contains(JsSubSubjectDoesNotMatchFilter.id())); } - private void setupConsumer(JetStreamManagement jsm, int i, String fs) throws IOException, JetStreamApiException { + private void filterMatchSetupConsumer(JetStreamManagement jsm, int i, String... fs) throws IOException, JetStreamApiException { jsm.addOrUpdateConsumer(STREAM, - builder().deliverSubject(deliver(i)).durable(durable(i)).filterSubject(fs).build()); + builder().deliverSubject(deliver(i)).durable(durable(i)).filterSubjects(fs).build()); } @Test @@ -652,17 +644,19 @@ public void testBindDurableDeliverSubject() throws Exception { // create the stream. createDefaultTestStream(jsm); - // create a durable push subscriber - has deliver subject + // create a durable push subscriber - has a deliver subject ConsumerConfiguration ccDurPush = builder() - .durable(durable(1)) - .deliverSubject(deliver(1)) - .build(); + .durable(durable(1)) + .deliverSubject(deliver(1)) + .filterSubject(SUBJECT) + .build(); jsm.addOrUpdateConsumer(STREAM, ccDurPush); // create a durable pull subscriber - notice no deliver subject ConsumerConfiguration ccDurPull = builder() - .durable(durable(2)) - .build(); + .durable(durable(2)) + .filterSubject(SUBJECT) + .build(); jsm.addOrUpdateConsumer(STREAM, ccDurPull); // try to pull subscribe against a push durable @@ -677,10 +671,6 @@ public void testBindDurableDeliverSubject() throws Exception { ); assertTrue(iae.getMessage().contains(JsSubConsumerAlreadyConfiguredAsPush.id())); - // this one is okay - JetStreamSubscription sub = js.subscribe(SUBJECT, PullSubscribeOptions.builder().durable(durable(2)).build()); - unsubscribeEnsureNotBound(sub); // so I can re-use the durable - // try to push subscribe against a pull durable iae = assertThrows(IllegalArgumentException.class, () -> js.subscribe(SUBJECT, PushSubscribeOptions.builder().durable(durable(2)).build()) @@ -719,6 +709,7 @@ public void testConsumerIsNotModified() throws Exception { .maxBatch(10) .maxBytes(11) .replayPolicy(ReplayPolicy.Instant) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -736,6 +727,7 @@ public void testConsumerIsNotModified() throws Exception { .maxBytes(48) .rateLimit(44) .maxAckPending(45) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -745,6 +737,7 @@ public void testConsumerIsNotModified() throws Exception { cc = builder() .durable(durable(22)) .maxPullWaiting(46) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -757,6 +750,7 @@ public void testConsumerIsNotModified() throws Exception { .deliverSubject(deliver(3)) .durable(durable(3)) .startTime(ZonedDateTime.now().plusHours(1)) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -771,6 +765,7 @@ public void testConsumerIsNotModified() throws Exception { .headersOnly(true) .maxExpires(30000) .ackWait(2000) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -784,6 +779,7 @@ public void testConsumerIsNotModified() throws Exception { .deliverPolicy(DeliverPolicy.Last) .ackPolicy(AckPolicy.None) .replayPolicy(ReplayPolicy.Original) + .filterSubject(SUBJECT) .build(); jsm.addOrUpdateConsumer(STREAM, cc); @@ -909,11 +905,11 @@ private void _changeEx(IllegalArgumentException iae, String changedField) { } private Builder pushDurableBuilder() { - return builder().durable(PUSH_DURABLE).deliverSubject(DELIVER); + return builder().durable(PUSH_DURABLE).deliverSubject(DELIVER).filterSubject(SUBJECT); } private Builder pullDurableBuilder() { - return builder().durable(PULL_DURABLE); + return builder().durable(PULL_DURABLE).filterSubject(SUBJECT); } @Test diff --git a/src/test/java/io/nats/client/impl/JetStreamPullTests.java b/src/test/java/io/nats/client/impl/JetStreamPullTests.java index 5fc7bc043..4408b77b4 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPullTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPullTests.java @@ -121,13 +121,13 @@ public void testIterate() throws Exception { Duration ackWaitDur = Duration.ofMillis(fetchMs * 2); ConsumerConfiguration cc = ConsumerConfiguration.builder() - .ackWait(ackWaitDur) - .build(); + .ackWait(ackWaitDur) + .build(); PullSubscribeOptions options = PullSubscribeOptions.builder() - .durable(DURABLE) - .configuration(cc) - .build(); + .durable(DURABLE) + .configuration(cc) + .build(); JetStreamSubscription sub = js.subscribe(SUBJECT, options); assertSubscription(sub, STREAM, DURABLE, null, true); @@ -638,7 +638,7 @@ public void testDurable() throws Exception { @Test public void testNamed() throws Exception { - runInJsServer(nc -> { + runInJsServer(this::atLeast290, nc -> { JetStream js = nc.jetStream(); JetStreamManagement jsm = nc.jetStreamManagement(); @@ -1017,7 +1017,7 @@ public void testExceedsMaxRequestBytesNthMessageSyncSub() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build()); + jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).filterSubjects(SUBJECT).build()); PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1)); JetStreamSubscription sub = js.subscribe(SUBJECT, so); @@ -1049,7 +1049,7 @@ public void testExceedsMaxRequestBytesExactBytes() throws Exception { createDefaultTestStream(nc); JetStreamManagement jsm = nc.jetStreamManagement(); JetStream js = nc.jetStream(); - jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build()); + jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).ackPolicy(AckPolicy.None).filterSubjects(SUBJECT).build()); PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1)); JetStreamSubscription sub = js.subscribe(SUBJECT, so); @@ -1079,7 +1079,7 @@ public void testReader() throws Exception { JetStream js = nc.jetStream(); // Pre define a consumer - ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build(); + ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).filterSubjects(SUBJECT).build(); jsm.addOrUpdateConsumer(STREAM, cc); PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, DURABLE); diff --git a/src/test/java/io/nats/client/impl/JetStreamTestBase.java b/src/test/java/io/nats/client/impl/JetStreamTestBase.java index e33979fb6..f12b35bad 100644 --- a/src/test/java/io/nats/client/impl/JetStreamTestBase.java +++ b/src/test/java/io/nats/client/impl/JetStreamTestBase.java @@ -48,7 +48,11 @@ public class JetStreamTestBase extends TestBase { public static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(1000); public boolean atLeast290(Connection nc) { - return nc.getServerInfo().isSameOrNewerThanVersion("2.9.0"); + return atLeast290(nc.getServerInfo()); + } + + public boolean atLeast290(ServerInfo si) { + return si.isSameOrNewerThanVersion("2.9.0"); } public boolean atLeast291(ServerInfo si) { diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 3c012a328..5f9d77d82 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -601,70 +601,80 @@ public void testOrderedBehaviors() throws Exception { CreateStreamResult csr = createMemoryStream(jsm); StreamContext sc = js.getStreamContext(csr.stream); jsPublish(js, csr.subject, 101, 6); - testOrderedBehaviorNext(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); - try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; - // Get this in place before subscriptions are made - ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; - - csr = createMemoryStream(jsm); - sc = js.getStreamContext(csr.stream); - jsPublish(js, csr.subject, 101, 6); - testOrderedBehaviorFetch(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); - try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; - - csr = createMemoryStream(jsm); - sc = js.getStreamContext(csr.stream); - jsPublish(js, csr.subject, 101, 6); - testOrderedBehaviorIterable(sc, new OrderedConsumerConfiguration().filterSubject(csr.subject)); - try { jsm.deleteStream(csr.stream); } catch (Exception ignore) {}; - }); - } - - private static void testOrderedBehaviorNext(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { - OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = ctx.next(1000); - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(1, m.metaData().consumerSequence()); - ++expectedStreamSeq; - } - } - } - - private static void testOrderedBehaviorFetch(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { - OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); - try (FetchConsumer fcon = ctx.fetchMessages(6)) { + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); // Loop through the messages to make sure I get stream sequence 1 to 6 int expectedStreamSeq = 1; while (expectedStreamSeq <= 6) { - Message m = fcon.nextMessage(); + Message m = ctx.next(1000); if (m != null) { assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + assertEquals(1, m.metaData().consumerSequence()); ++expectedStreamSeq; } } - } + }); } - private static void testOrderedBehaviorIterable(StreamContext sc, OrderedConsumerConfiguration occ) throws Exception { - OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); - try (IterableConsumer icon = ctx.iterate()) { - // Loop through the messages to make sure I get stream sequence 1 to 6 - int expectedStreamSeq = 1; - while (expectedStreamSeq <= 6) { - Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage - if (m != null) { - assertEquals(expectedStreamSeq, m.metaData().streamSequence()); - assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); - ++expectedStreamSeq; + @Test + public void testOrderedBehaviorFetch() throws Exception { + runInJsServer(this::atLeast291, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + + CreateStreamResult csr = createMemoryStream(jsm); + StreamContext sc = js.getStreamContext(csr.stream); + jsPublish(js, csr.subject, 101, 6); + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (FetchConsumer fcon = ctx.fetchMessages(6)) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = fcon.nextMessage(); + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; + } } } - } + }); + } + + @Test + public void testOrderedBehaviorIterable() throws Exception { + runInJsServer(this::atLeast291, nc -> { + // Setup + JetStream js = nc.jetStream(); + JetStreamManagement jsm = nc.jetStreamManagement(); + + // Get this in place before subscriptions are made + ((NatsJetStream)js)._pullOrderedMessageManagerFactory = OrderedPullTestDropSimulator::new; + + CreateStreamResult csr = createMemoryStream(jsm); + StreamContext sc = js.getStreamContext(csr.stream); + jsPublish(js, csr.subject, 101, 6); + OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(csr.subject); + OrderedConsumerContext ctx = sc.createOrderedConsumer(occ); + try (IterableConsumer icon = ctx.iterate()) { + // Loop through the messages to make sure I get stream sequence 1 to 6 + int expectedStreamSeq = 1; + while (expectedStreamSeq <= 6) { + Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage + if (m != null) { + assertEquals(expectedStreamSeq, m.metaData().streamSequence()); + assertEquals(EXPECTED_CON_SEQ_NUMS[expectedStreamSeq-1], m.metaData().consumerSequence()); + ++expectedStreamSeq; + } + } + } + }); } @Test diff --git a/src/test/java/io/nats/client/support/JsonParsingTests.java b/src/test/java/io/nats/client/support/JsonParsingTests.java index 38e50df61..9f694b8d0 100644 --- a/src/test/java/io/nats/client/support/JsonParsingTests.java +++ b/src/test/java/io/nats/client/support/JsonParsingTests.java @@ -64,8 +64,8 @@ public void testStringParsing() { addField(key(x++), HAS_SPACE, oMap, list, encodeds, decodeds); addField(key(x++), HAS_PRINTABLE, oMap, list, encodeds, decodeds); addField(key(x++), HAS_DOT, oMap, list, encodeds, decodeds); - addField(key(x++), HAS_STAR, oMap, list, encodeds, decodeds); - addField(key(x++), HAS_GT, oMap, list, encodeds, decodeds); + addField(key(x++), STAR_NOT_SEGMENT, oMap, list, encodeds, decodeds); + addField(key(x++), GT_NOT_SEGMENT, oMap, list, encodeds, decodeds); addField(key(x++), HAS_DASH, oMap, list, encodeds, decodeds); addField(key(x++), HAS_UNDER, oMap, list, encodeds, decodeds); addField(key(x++), HAS_DOLLAR, oMap, list, encodeds, decodeds); diff --git a/src/test/java/io/nats/client/support/ValidatorTests.java b/src/test/java/io/nats/client/support/ValidatorTests.java index 5d6aedcb9..eddfe602e 100644 --- a/src/test/java/io/nats/client/support/ValidatorTests.java +++ b/src/test/java/io/nats/client/support/ValidatorTests.java @@ -35,19 +35,26 @@ public static void beforeAll() { @Test public void testValidateSubject() { - allowedRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_STAR, HAS_GT, HAS_DOLLAR)); - notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_LOW, HAS_127)); - notAllowedRequired(Validator::validateSubject, UTF_ONLY_STRINGS); - allowedNotRequiredEmptyAsNull(Validator::validateSubject, Arrays.asList(null, EMPTY)); - - notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_LOW, HAS_127)); + // subject is required + allowedRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR, HAS_LOW, HAS_127)); + allowedRequired(Validator::validateSubject, UTF_ONLY_STRINGS); + allowedRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT)); + notAllowedRequired(Validator::validateSubject, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_CR, HAS_LF)); + notAllowedRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT)); + + // subject not required, null and empty both mean not supplied allowedNotRequiredEmptyAsNull(Validator::validateSubject, Arrays.asList(null, EMPTY)); + allowedNotRequired(Validator::validateSubject, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR, HAS_LOW, HAS_127)); + allowedNotRequired(Validator::validateSubject, UTF_ONLY_STRINGS); + allowedNotRequired(Validator::validateSubject, Arrays.asList(STAR_SEGMENT, GT_LAST_SEGMENT)); + notAllowedNotRequired(Validator::validateSubject, Arrays.asList(HAS_SPACE, HAS_CR, HAS_LF)); + notAllowedNotRequired(Validator::validateSubject, Arrays.asList(STARTS_WITH_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT)); } @Test public void testValidateReplyTo() { allowedRequired(Validator::validateReplyTo, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOT, HAS_DOLLAR)); - notAllowedRequired(Validator::validateReplyTo, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_STAR, HAS_GT, HAS_LOW, HAS_127)); + notAllowedRequired(Validator::validateReplyTo, Arrays.asList(null, EMPTY, HAS_SPACE, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127)); notAllowedRequired(Validator::validateReplyTo, UTF_ONLY_STRINGS); allowedNotRequiredEmptyAsNull(Validator::validateReplyTo, Arrays.asList(null, EMPTY)); } @@ -56,7 +63,7 @@ public void testValidateReplyTo() { public void testValidateQueueName() { // validateQueueName(String s, boolean required) allowedRequired(Validator::validateQueueName, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR)); - notAllowedRequired(Validator::validateQueueName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127)); + notAllowedRequired(Validator::validateQueueName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127)); notAllowedRequired(Validator::validateQueueName, UTF_ONLY_STRINGS); allowedNotRequiredEmptyAsNull(Validator::validateQueueName, Arrays.asList(null, EMPTY)); } @@ -64,7 +71,7 @@ public void testValidateQueueName() { @Test public void testValidateStreamName() { allowedRequired(Validator::validateStreamName, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR)); - notAllowedRequired(Validator::validateStreamName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127)); + notAllowedRequired(Validator::validateStreamName, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127)); notAllowedRequired(Validator::validateStreamName, UTF_ONLY_STRINGS); allowedNotRequiredEmptyAsNull(Validator::validateStreamName, Arrays.asList(null, EMPTY)); } @@ -72,7 +79,7 @@ public void testValidateStreamName() { @Test public void testValidateDurable() { allowedRequired(Validator::validateDurable, Arrays.asList(PLAIN, HAS_PRINTABLE, HAS_DOLLAR)); - notAllowedRequired(Validator::validateDurable, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, HAS_STAR, HAS_GT, HAS_LOW, HAS_127)); + notAllowedRequired(Validator::validateDurable, Arrays.asList(null, EMPTY, HAS_SPACE, HAS_DOT, STAR_NOT_SEGMENT, GT_NOT_SEGMENT, HAS_LOW, HAS_127)); notAllowedRequired(Validator::validateDurable, UTF_ONLY_STRINGS); allowedNotRequiredEmptyAsNull(Validator::validateDurable, Arrays.asList(null, EMPTY)); } @@ -82,8 +89,8 @@ public void testValidatePrintable() { validatePrintable(PLAIN, "label", true); validatePrintable(HAS_PRINTABLE, "label", true); validatePrintable(HAS_DOT, "label", true); - validatePrintable(HAS_STAR, "label", true); - validatePrintable(HAS_GT, "label", true); + validatePrintable(STAR_NOT_SEGMENT, "label", true); + validatePrintable(GT_NOT_SEGMENT, "label", true); validatePrintable(HAS_DASH, "label", true); validatePrintable(HAS_UNDER, "label", true); validatePrintable(HAS_DOLLAR, "label", true); @@ -229,8 +236,8 @@ public void testValidateBucketName() { assertThrows(IllegalArgumentException.class, () -> validateBucketName(null, true)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_SPACE, true)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOT, true)); - assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_STAR, true)); - assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_GT, true)); + assertThrows(IllegalArgumentException.class, () -> validateBucketName(STAR_NOT_SEGMENT, true)); + assertThrows(IllegalArgumentException.class, () -> validateBucketName(GT_NOT_SEGMENT, true)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOLLAR, true)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_LOW, true)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_127, true)); @@ -246,8 +253,8 @@ public void testValidateBucketName() { validateBucketName(null, false); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_SPACE, false)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOT, false)); - assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_STAR, false)); - assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_GT, false)); + assertThrows(IllegalArgumentException.class, () -> validateBucketName(STAR_NOT_SEGMENT, false)); + assertThrows(IllegalArgumentException.class, () -> validateBucketName(GT_NOT_SEGMENT, false)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_DOLLAR, false)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_LOW, false)); assertThrows(IllegalArgumentException.class, () -> validateBucketName(HAS_127, false)); @@ -265,8 +272,8 @@ public void testValidateWildcardKeyRequired() { validateKvKeyWildcardAllowedRequired(HAS_FWD_SLASH); validateKvKeyWildcardAllowedRequired(HAS_EQUALS); validateKvKeyWildcardAllowedRequired(HAS_DOT); - validateKvKeyWildcardAllowedRequired(HAS_STAR); - validateKvKeyWildcardAllowedRequired(HAS_GT); + validateKvKeyWildcardAllowedRequired(STAR_NOT_SEGMENT); + validateKvKeyWildcardAllowedRequired(GT_NOT_SEGMENT); validateKvKeyWildcardAllowedRequired("numbers9ok"); assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(null)); assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_SPACE)); @@ -290,8 +297,8 @@ public void testValidateNonWildcardKeyRequired() { validateNonWildcardKvKeyRequired("numbers9ok"); assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(null)); assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_SPACE)); - assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_STAR)); - assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_GT)); + assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(STAR_NOT_SEGMENT)); + assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(GT_NOT_SEGMENT)); assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_DOLLAR)); assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_LOW)); assertThrows(IllegalArgumentException.class, () -> validateNonWildcardKvKeyRequired(HAS_127)); @@ -413,15 +420,27 @@ private void allowedRequired(StringTest test, List strings) { } } + private void notAllowedRequired(StringTest test, List strings) { + for (String s : strings) { + assertThrows(IllegalArgumentException.class, () -> test.validate(s, true), notAllowedMessage(s)); + } + } + + private void allowedNotRequired(StringTest test, List strings) { + for (String s : strings) { + assertEquals(s, test.validate(s, false), allowedMessage(s)); + } + } + private void allowedNotRequiredEmptyAsNull(StringTest test, List strings) { for (String s : strings) { assertNull(test.validate(s, false), allowedMessage(s)); } } - private void notAllowedRequired(StringTest test, List strings) { + private void notAllowedNotRequired(StringTest test, List strings) { for (String s : strings) { - assertThrows(IllegalArgumentException.class, () -> test.validate(s, true), notAllowedMessage(s)); + assertThrows(IllegalArgumentException.class, () -> test.validate(s, false), notAllowedMessage(s)); } } @@ -526,29 +545,53 @@ public void testSemver() { @Test public void testListsAreEqual() { List l1 = Arrays.asList("one", "two"); - List l2 = Arrays.asList("one", "two"); - List l3 = Collections.singletonList("three"); - List l4 = null; + List l2 = Arrays.asList("two", "one"); + List l3 = Arrays.asList("one", "not"); + List l4 = Collections.singletonList("three"); List l5 = null; List l6 = new ArrayList<>(); - assertTrue(listsAreEqual(l1, l1, true)); - assertTrue(listsAreEqual(l1, l1, false)); - assertTrue(listsAreEqual(l1, l2, true)); - assertTrue(listsAreEqual(l1, l2, false)); - assertFalse(listsAreEqual(l1, l3, true)); - assertFalse(listsAreEqual(l1, l3, false)); - - assertFalse(listsAreEqual(l4, l1, true)); - assertFalse(listsAreEqual(l4, l1, false)); - - assertTrue(listsAreEqual(l4, l5, true)); - assertTrue(listsAreEqual(l4, l5, false)); - assertFalse(listsAreEqual(l4, l6, true)); - assertFalse(listsAreEqual(l4, l6, false)); - - assertTrue(listsAreEqual(l6, l4, true)); - assertFalse(listsAreEqual(l6, l4, false)); + assertTrue(listsAreEquivalent(l1, l1)); + assertTrue(listsAreEquivalent(l1, l2)); + assertFalse(listsAreEquivalent(l1, l3)); + assertFalse(listsAreEquivalent(l1, l4)); + assertFalse(listsAreEquivalent(l1, l5)); + assertFalse(listsAreEquivalent(l1, l6)); + + assertTrue(listsAreEquivalent(l2, l1)); + assertTrue(listsAreEquivalent(l2, l2)); + assertFalse(listsAreEquivalent(l2, l3)); + assertFalse(listsAreEquivalent(l2, l4)); + assertFalse(listsAreEquivalent(l2, l5)); + assertFalse(listsAreEquivalent(l2, l6)); + + assertFalse(listsAreEquivalent(l3, l1)); + assertFalse(listsAreEquivalent(l3, l2)); + assertTrue(listsAreEquivalent(l3, l3)); + assertFalse(listsAreEquivalent(l3, l4)); + assertFalse(listsAreEquivalent(l3, l5)); + assertFalse(listsAreEquivalent(l3, l6)); + + assertFalse(listsAreEquivalent(l4, l1)); + assertFalse(listsAreEquivalent(l4, l2)); + assertFalse(listsAreEquivalent(l4, l3)); + assertTrue(listsAreEquivalent(l4, l4)); + assertFalse(listsAreEquivalent(l4, l5)); + assertFalse(listsAreEquivalent(l4, l6)); + + assertFalse(listsAreEquivalent(l5, l1)); + assertFalse(listsAreEquivalent(l5, l2)); + assertFalse(listsAreEquivalent(l5, l3)); + assertFalse(listsAreEquivalent(l5, l4)); + assertTrue(listsAreEquivalent(l5, l5)); + assertTrue(listsAreEquivalent(l5, l6)); + + assertFalse(listsAreEquivalent(l6, l1)); + assertFalse(listsAreEquivalent(l6, l2)); + assertFalse(listsAreEquivalent(l6, l3)); + assertFalse(listsAreEquivalent(l6, l4)); + assertTrue(listsAreEquivalent(l6, l5)); + assertTrue(listsAreEquivalent(l6, l6)); } @Test @@ -562,37 +605,73 @@ public void testMapsAreEqual() { m2.put("one", "1"); Map m3 = new HashMap<>(); - m3.put("three", "3"); - m3.put("two", "4"); + m3.put("one", "1"); + m3.put("two", "not"); Map m4 = new HashMap<>(); - m3.put("four", "4"); + m4.put("one", "1"); + m4.put("not", "not"); + + Map m5 = new HashMap<>(); + m5.put("five", "5"); - Map m5 = null; Map m6 = null; Map m7 = new HashMap<>(); - assertTrue(mapsAreEqual(m1, m1, true)); - assertTrue(mapsAreEqual(m1, m1, false)); - assertTrue(mapsAreEqual(m1, m2, true)); - assertTrue(mapsAreEqual(m1, m2, false)); - assertFalse(mapsAreEqual(m1, m3, true)); - assertFalse(mapsAreEqual(m1, m3, false)); - assertFalse(mapsAreEqual(m1, m4, true)); - assertFalse(mapsAreEqual(m1, m4, false)); - assertFalse(mapsAreEqual(m1, m5, true)); - assertFalse(mapsAreEqual(m1, m5, false)); - - assertFalse(mapsAreEqual(m5, m1, true)); - assertFalse(mapsAreEqual(m5, m1, false)); - - assertTrue(mapsAreEqual(m5, m6, true)); - assertTrue(mapsAreEqual(m5, m6, false)); - assertFalse(mapsAreEqual(m5, m7, true)); - assertFalse(mapsAreEqual(m5, m7, false)); - - assertTrue(mapsAreEqual(m7, m5, true)); - assertFalse(mapsAreEqual(m7, m5, false)); + assertTrue(mapsAreEquivalent(m1, m1)); + assertTrue(mapsAreEquivalent(m1, m2)); + assertFalse(mapsAreEquivalent(m1, m3)); + assertFalse(mapsAreEquivalent(m1, m4)); + assertFalse(mapsAreEquivalent(m1, m5)); + assertFalse(mapsAreEquivalent(m1, m6)); + assertFalse(mapsAreEquivalent(m1, m7)); + + assertTrue(mapsAreEquivalent(m2, m1)); + assertTrue(mapsAreEquivalent(m2, m2)); + assertFalse(mapsAreEquivalent(m2, m3)); + assertFalse(mapsAreEquivalent(m2, m4)); + assertFalse(mapsAreEquivalent(m2, m5)); + assertFalse(mapsAreEquivalent(m2, m6)); + assertFalse(mapsAreEquivalent(m2, m7)); + + assertFalse(mapsAreEquivalent(m3, m1)); + assertFalse(mapsAreEquivalent(m3, m2)); + assertTrue(mapsAreEquivalent(m3, m3)); + assertFalse(mapsAreEquivalent(m3, m4)); + assertFalse(mapsAreEquivalent(m3, m5)); + assertFalse(mapsAreEquivalent(m3, m6)); + assertFalse(mapsAreEquivalent(m3, m7)); + + assertFalse(mapsAreEquivalent(m4, m1)); + assertFalse(mapsAreEquivalent(m4, m2)); + assertFalse(mapsAreEquivalent(m4, m3)); + assertTrue(mapsAreEquivalent(m4, m4)); + assertFalse(mapsAreEquivalent(m4, m5)); + assertFalse(mapsAreEquivalent(m4, m6)); + assertFalse(mapsAreEquivalent(m4, m7)); + + assertFalse(mapsAreEquivalent(m5, m1)); + assertFalse(mapsAreEquivalent(m5, m2)); + assertFalse(mapsAreEquivalent(m5, m3)); + assertFalse(mapsAreEquivalent(m5, m4)); + assertTrue(mapsAreEquivalent(m5, m5)); + assertFalse(mapsAreEquivalent(m5, m6)); + assertFalse(mapsAreEquivalent(m5, m7)); + + assertFalse(mapsAreEquivalent(m6, m1)); + assertFalse(mapsAreEquivalent(m6, m2)); + assertFalse(mapsAreEquivalent(m6, m3)); + assertFalse(mapsAreEquivalent(m6, m4)); + assertFalse(mapsAreEquivalent(m6, m5)); + assertTrue(mapsAreEquivalent(m6, m6)); + assertTrue(mapsAreEquivalent(m6, m7)); + + assertFalse(mapsAreEquivalent(m7, m1)); + assertFalse(mapsAreEquivalent(m7, m2)); + assertFalse(mapsAreEquivalent(m7, m3)); + assertFalse(mapsAreEquivalent(m7, m5)); + assertTrue(mapsAreEquivalent(m7, m6)); + assertTrue(mapsAreEquivalent(m7, m7)); } } \ No newline at end of file diff --git a/src/test/java/io/nats/client/utils/TestBase.java b/src/test/java/io/nats/client/utils/TestBase.java index 4fe2302d6..2143978b5 100644 --- a/src/test/java/io/nats/client/utils/TestBase.java +++ b/src/test/java/io/nats/client/utils/TestBase.java @@ -38,16 +38,22 @@ public class TestBase { + public static final String STAR_SEGMENT = "*.star.*.segment.*"; + public static final String GT_LAST_SEGMENT = "gt.last.>"; + public static final String STARTS_WITH_DOT = ".starts-with-dot"; + public static final String STAR_NOT_SEGMENT = "star*not*segment"; + public static final String GT_NOT_SEGMENT = "gt>not>segment"; + public static final String PLAIN = "plain"; public static final String HAS_SPACE = "has space"; public static final String HAS_PRINTABLE = "has-print!able"; public static final String HAS_DOT = "has.dot"; - public static final String HAS_STAR = "has*star"; - public static final String HAS_GT = "has>gt"; public static final String HAS_DASH = "has-dash"; public static final String HAS_UNDER = "has_under"; public static final String HAS_DOLLAR = "has$dollar"; - public static final String HAS_LOW = "has\tlower\rthan\nspace"; + public static final String HAS_CR = "has\rcr"; + public static final String HAS_LF = "has\nlf"; + public static final String HAS_LOW = "has" + (char)0 + "low"; public static final String HAS_127 = "has" + (char)127 + "127"; public static final String HAS_FWD_SLASH = "has/fwd/slash"; public static final String HAS_BACK_SLASH = "has\\back\\slash"; diff --git a/src/test/java/io/nats/service/ServiceTests.java b/src/test/java/io/nats/service/ServiceTests.java index db9b0a8b3..45494565a 100644 --- a/src/test/java/io/nats/service/ServiceTests.java +++ b/src/test/java/io/nats/service/ServiceTests.java @@ -537,8 +537,8 @@ public void testServiceBuilderConstruction() { assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_SPACE)); assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_PRINTABLE)); assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOT)); - assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_STAR)); // invalid in the middle - assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_GT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(STAR_NOT_SEGMENT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> Service.builder().name(GT_NOT_SEGMENT)); // invalid in the middle assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_DOLLAR)); assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_LOW)); assertThrows(IllegalArgumentException.class, () -> Service.builder().name(HAS_127)); @@ -744,14 +744,14 @@ public void testEndpointConstruction() { assertTrue(JsonUtils.mapEquals(metadata, e.getMetadata())); assertThrows(IllegalArgumentException.class, () -> Endpoint.builder().build()); - // many names are bad + // many names are bad and is required assertThrows(IllegalArgumentException.class, () -> new Endpoint((String) null)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(EMPTY)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_SPACE)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_PRINTABLE)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_DOT)); - assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_STAR)); // invalid in the middle - assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_GT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> new Endpoint(STAR_NOT_SEGMENT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> new Endpoint(GT_NOT_SEGMENT)); // invalid in the middle assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_DOLLAR)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_LOW)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_127)); @@ -760,11 +760,13 @@ public void testEndpointConstruction() { assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_EQUALS)); assertThrows(IllegalArgumentException.class, () -> new Endpoint(HAS_TIC)); - // fewer subjects are bad + // typical subjects are bad assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_SPACE)); - assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_LOW)); - assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_127)); - assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, "foo.>.bar")); // gt is not last segment + assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_CR)); + assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, HAS_LF)); + assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, STAR_NOT_SEGMENT)); + assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, GT_NOT_SEGMENT)); + assertThrows(IllegalArgumentException.class, () -> new Endpoint(NAME, STARTS_WITH_DOT)); } @Test @@ -847,11 +849,12 @@ public void testGroupConstruction() { g1 = new Group("foo.*"); assertEquals("foo.*", g1.getName()); + assertThrows(IllegalArgumentException.class, () -> new Group(null)); + assertThrows(IllegalArgumentException.class, () -> new Group(EMPTY)); assertThrows(IllegalArgumentException.class, () -> new Group(HAS_SPACE)); - assertThrows(IllegalArgumentException.class, () -> new Group(HAS_LOW)); - assertThrows(IllegalArgumentException.class, () -> new Group(HAS_127)); - assertThrows(IllegalArgumentException.class, () -> new Group("foo.>")); // gt is last segment - assertThrows(IllegalArgumentException.class, () -> new Group("foo.>.bar")); // gt is not last segment + assertThrows(IllegalArgumentException.class, () -> new Group(STAR_NOT_SEGMENT)); // invalid in the middle + assertThrows(IllegalArgumentException.class, () -> new Group("foo.>")); // GT invalid everywhere + assertThrows(IllegalArgumentException.class, () -> new Group(GT_NOT_SEGMENT)); // GT invalid everywhere EqualsVerifier.simple().forClass(Group.class).withPrefabValues(Group.class, g1, g2).verify(); }