Skip to content

Commit

Permalink
KV Watch Multiple Filters (#887)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Apr 11, 2024
1 parent 58c66f1 commit 7470c56
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 12 deletions.
11 changes: 10 additions & 1 deletion src/NATS.Client/Internals/Validator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,16 @@ public static string ValidatePrefixOrDomain(string s, string label, bool require
});
}

public static string ValidateKvKeyWildcardAllowedRequired(string s) {
public static IList<String> ValidateKvKeysWildcardAllowedRequired(IList<String> keys) {
Required(keys, "Key");
foreach (string key in keys) {
ValidateWildcardKvKey(key, "Key", true);
}
return keys;
}

public static string ValidateKvKeyWildcardAllowedRequired(string s)
{
return ValidateWildcardKvKey(s, "Key", true);
}

Expand Down
23 changes: 21 additions & 2 deletions src/NATS.Client/KeyValue/IKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public interface IKeyValue
void Purge(string key, ulong expectedRevision);

/// <summary>
/// Watch updates for a specific key
/// Watch updates for a specific key.
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
Expand All @@ -117,7 +117,7 @@ public interface IKeyValue
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific key, starting from a specific revision
/// Watch updates for a specific key, starting from a specific revision.
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
Expand All @@ -126,6 +126,25 @@ public interface IKeyValue
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific keys.
/// </summary>
/// <param name="keys">the keys</param>
/// <param name="watcher">the watcher</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific keys, starting from a specific revision.
/// </summary>
/// <param name="keys">the keys</param>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys
/// </summary>
Expand Down
23 changes: 19 additions & 4 deletions src/NATS.Client/KeyValue/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using NATS.Client.Internals;
Expand Down Expand Up @@ -177,26 +178,40 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {key}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {key}, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeysWildcardAllowedRequired(keys);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, keys, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(IList<string> keys, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeysWildcardAllowedRequired(keys);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions);
return new KeyValueWatchSubscription(this, new List<string> {">"}, watcher, fromRevision, watchOptions);
}

private PublishAck _write(string key, byte[] data, MsgHeader h) {
Expand Down
15 changes: 10 additions & 5 deletions src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.JetStream;

Expand All @@ -23,11 +24,15 @@ public class KeyValueWatchSubscription : IDisposable
private readonly InterlockedBoolean endOfDataSent;
private readonly object subLock;

public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
public KeyValueWatchSubscription(KeyValue kv, IList<string> keyPatterns,
IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
subLock = new object();
string subscribeSubject = kv.ReadSubject(keyPattern);
IList<string> subscribeSubjects = new List<string>();
foreach (string keyPattern in keyPatterns)
{
subscribeSubjects.Add(kv.ReadSubject(keyPattern));
}

// figure out the result options
bool headersOnly = false;
Expand All @@ -52,7 +57,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
else
{
fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision
if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
if (deliverPolicy == DeliverPolicy.New)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
Expand All @@ -72,7 +77,7 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
.WithDeliverPolicy(deliverPolicy)
.WithStartSequence(fromRevision)
.WithHeadersOnly(headersOnly)
.WithFilterSubject(subscribeSubject)
.WithFilterSubjects(subscribeSubjects)
.Build())
.Build();

Expand All @@ -91,7 +96,7 @@ void Handler(object sender, MsgHandlerEventArgs args)
}
}

sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso);
sub = kv.js.PushSubscribeAsync(null, Handler, false, pso);
if (endOfDataSent.IsFalse())
{
ulong pending = sub.GetConsumerInformation().CalculatedPending;
Expand Down
10 changes: 10 additions & 0 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ public void TestWatch()
object[] allPutsExpecteds = {
"a", "aa", "z", "zz", "aaa", "zzz", null
};

IList<string> allKeys = new List<string> {key1, key2, keyNull};

Context.RunInJsServer(c =>
{
Expand Down Expand Up @@ -899,6 +901,8 @@ public void TestWatch()
TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
IList<KeyValueWatchSubscription> subs = new List<KeyValueWatchSubscription>();
subs.Add(kv.Watch(key1, key1FullWatcher, key1FullWatcher.WatchOptions));
Expand All @@ -916,6 +920,12 @@ public void TestWatch()
subs.Add(kv.Watch("key.>", gtFullWatcher, gtFullWatcher.WatchOptions));
subs.Add(kv.Watch("key.>", gtMetaWatcher, gtMetaWatcher.WatchOptions));
if (AtLeast2_10(c.ServerInfo))
{
subs.Add(kv.Watch(allKeys, multipleFullWatcher, multipleFullWatcher.WatchOptions));
subs.Add(kv.Watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.WatchOptions));
}
kv.Put(key1, "a");
kv.Put(key1, "aa");
kv.Put(key2, "z");
Expand Down

0 comments on commit 7470c56

Please sign in to comment.