diff --git a/src/main/java/io/nats/client/KeyValue.java b/src/main/java/io/nats/client/KeyValue.java index 93097461e..28bd71aba 100644 --- a/src/main/java/io/nats/client/KeyValue.java +++ b/src/main/java/io/nats/client/KeyValue.java @@ -168,8 +168,8 @@ public interface KeyValue { void purge(String key, long expectedRevision) throws IOException, JetStreamApiException; /** - * Watch updates for a specific key or keys. - * @param key the key or a comma delimited list of keys. + * Watch updates for a specific key. + * @param key the key. * @param watcher the watcher the implementation to receive changes * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. * @return The KeyValueWatchSubscription @@ -181,8 +181,8 @@ public interface KeyValue { NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException; /** - * Watch updates for a specific key or keys, starting at a specific revision. - * @param key the key or a comma delimited list of keys. + * Watch updates for a specific key, starting at a specific revision. + * @param key the key. * @param watcher the watcher the implementation to receive changes * @param fromRevision the revision to start from * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. diff --git a/src/main/java/io/nats/client/impl/NatsKeyValue.java b/src/main/java/io/nats/client/impl/NatsKeyValue.java index ed339fe7d..763d13635 100644 --- a/src/main/java/io/nats/client/impl/NatsKeyValue.java +++ b/src/main/java/io/nats/client/impl/NatsKeyValue.java @@ -25,7 +25,6 @@ import java.nio.charset.StandardCharsets; import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -239,28 +238,27 @@ private PublishAck _write(String key, byte[] data, Headers h) throws IOException @Override public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { - if (key.contains(",")) { - return watch(Arrays.asList(key.split(",")), watcher, -1, watchOptions); - } - return watch(Collections.singletonList(key), watcher, -1, watchOptions); + validateKvKeyWildcardAllowedRequired(key); + validateNotNull(watcher, "Watcher is required"); + return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1, watchOptions); } @Override public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { - if (key.contains(",")) { - return watch(Arrays.asList(key.split(",")), watcher, fromRevision, watchOptions); - } - return watch(Collections.singletonList(key), watcher, fromRevision, watchOptions); + validateKvKeyWildcardAllowedRequired(key); + validateNotNull(watcher, "Watcher is required"); + return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions); } @Override public NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { - return watch(keys, watcher, -1, watchOptions); + validateKvKeysWildcardAllowedRequired(keys); + validateNotNull(watcher, "Watcher is required"); + return new NatsKeyValueWatchSubscription(this, keys, watcher, -1, watchOptions); } @Override public NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException { - // all watch methods (watch, watchAll) delegate to here validateKvKeysWildcardAllowedRequired(keys); validateNotNull(watcher, "Watcher is required"); return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions); diff --git a/src/test/java/io/nats/client/impl/KeyValueTests.java b/src/test/java/io/nats/client/impl/KeyValueTests.java index c5f536792..51d8828f7 100644 --- a/src/test/java/io/nats/client/impl/KeyValueTests.java +++ b/src/test/java/io/nats/client/impl/KeyValueTests.java @@ -914,8 +914,6 @@ public void testWatch() throws Exception { TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher("gtMetaWatcher", true, META_ONLY); TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher("multipleFullWatcher", true); TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher("multipleMetaWatcher", true, META_ONLY); - TestKeyValueWatcher multipleFullWatcher2 = new TestKeyValueWatcher("multipleFullWatcher2", true); - TestKeyValueWatcher multipleMetaWatcher2 = new TestKeyValueWatcher("multipleMetaWatcher2", true, META_ONLY); TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher("key1AfterWatcher", false, META_ONLY); TestKeyValueWatcher key1AfterIgDelWatcher = new TestKeyValueWatcher("key1AfterIgDelWatcher", false, META_ONLY, IGNORE_DELETE); TestKeyValueWatcher key1AfterStartNewWatcher = new TestKeyValueWatcher("key1AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY); @@ -943,10 +941,6 @@ public void testWatch() throws Exception { _testWatch(nc, starMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.*", starMetaWatcher, starMetaWatcher.watchOptions)); _testWatch(nc, gtFullWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtFullWatcher, gtFullWatcher.watchOptions)); _testWatch(nc, gtMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtMetaWatcher, gtMetaWatcher.watchOptions)); - _testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions)); - _testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions)); - _testWatch(nc, multipleFullWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleFullWatcher2, multipleFullWatcher.watchOptions)); - _testWatch(nc, multipleMetaWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher.watchOptions)); _testWatch(nc, key1AfterWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterWatcher, key1AfterWatcher.watchOptions)); _testWatch(nc, key1AfterIgDelWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.watchOptions)); _testWatch(nc, key1AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.watchOptions)); @@ -956,6 +950,11 @@ public void testWatch() throws Exception { _testWatch(nc, key2AfterStartFirstWatcher, key2AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.watchOptions)); _testWatch(nc, key1FromRevisionAfterWatcher, key1FromRevisionExpecteds, 2, kv -> kv.watch(TEST_WATCH_KEY_1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.watchOptions)); _testWatch(nc, allFromRevisionAfterWatcher, allFromRevisionExpecteds, 2, kv -> kv.watchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.watchOptions)); + + if (atLeast2_10()) { + _testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions)); + _testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions)); + } }); }