diff --git a/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs
index f81821370..5f96983c0 100644
--- a/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs
+++ b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs
@@ -11,6 +11,7 @@ internal static class NatsObjJsonSerializer
[JsonSerializable(typeof(ObjectMetadata))]
[JsonSerializable(typeof(MetaDataOptions))]
+[JsonSerializable(typeof(NatsObjLink))]
internal partial class NatsObjJsonSerializerContext : JsonSerializerContext
{
}
diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
index a58928470..24fc6c82f 100644
--- a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
+++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
@@ -81,15 +81,39 @@ public record ObjectMetadata
///
[JsonPropertyName("options")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
- public MetaDataOptions Options { get; set; } = default!;
+ public MetaDataOptions? Options { get; set; } = default!;
}
public record MetaDataOptions
{
+ ///
+ /// Link
+ ///
+ [JsonPropertyName("link")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public NatsObjLink? Link { get; set; } = default!;
+
///
/// Max chunk size
///
[JsonPropertyName("max_chunk_size")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
- public int MaxChunkSize { get; set; } = default!;
+ public int? MaxChunkSize { get; set; } = default!;
+}
+
+public record NatsObjLink
+{
+ ///
+ /// Link name
+ ///
+ [JsonPropertyName("name")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string Name { get; set; } = default!;
+
+ ///
+ /// Bucket name
+ ///
+ [JsonPropertyName("bucket")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string Bucket { get; set; } = default!;
}
diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs
index 144b63c45..f3c1087c7 100644
--- a/src/NATS.Client.ObjectStore/NatsObjContext.cs
+++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs
@@ -62,14 +62,14 @@ public async ValueTask CreateObjectStore(NatsObjConfig config, Can
};
var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
- return new NatsObjStore(config, _context, stream);
+ return new NatsObjStore(config, this, _context, stream);
}
public async ValueTask GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);
var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken);
- return new NatsObjStore(new NatsObjConfig(bucket), _context, stream);
+ return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream);
}
///
diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs
index 8a119c1f0..5808d7896 100644
--- a/src/NATS.Client.ObjectStore/NatsObjStore.cs
+++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs
@@ -1,5 +1,7 @@
using System.Buffers;
+using System.Runtime.CompilerServices;
using System.Security.Cryptography;
+using System.Text.Json;
using System.Text.RegularExpressions;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
@@ -21,15 +23,18 @@ public class NatsObjStore
private const string RollupSubject = "sub";
private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };
+
private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
private readonly string _bucket;
+ private readonly NatsObjContext _objContext;
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;
- internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream)
+ internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream)
{
_bucket = config.Bucket;
+ _objContext = objContext;
_context = context;
_stream = stream;
}
@@ -62,11 +67,20 @@ public async ValueTask GetAsync(string key, Stream stream, bool
var info = await GetInfoAsync(key, cancellationToken: cancellationToken);
+ if (info.Options?.Link is { } link)
+ {
+ var store = await _objContext.GetObjectStoreAsync(link.Bucket, cancellationToken).ConfigureAwait(false);
+ return await store.GetAsync(link.Name, stream, leaveOpen, cancellationToken).ConfigureAwait(false);
+ }
+
await using var pushConsumer = new NatsJSOrderedPushConsumer>(
_context,
$"OBJ_{_bucket}",
GetChunkSubject(info.Nuid),
- new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all },
+ new NatsJSOrderedPushConsumerOpts
+ {
+ DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
+ },
new NatsSubOpts(),
cancellationToken);
@@ -180,19 +194,19 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre
meta.Nuid = nuid;
meta.MTime = DateTimeOffset.UtcNow;
- if (meta.Options == null!)
+ meta.Options ??= new MetaDataOptions
{
- meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize };
- }
+ MaxChunkSize = DefaultChunkSize,
+ };
- if (meta.Options.MaxChunkSize == 0)
+ if (meta.Options.MaxChunkSize is null or <= 0)
{
meta.Options.MaxChunkSize = DefaultChunkSize;
}
var size = 0;
var chunks = 0;
- var chunkSize = meta.Options.MaxChunkSize;
+ var chunkSize = meta.Options.MaxChunkSize!.Value;
string digest;
using (var sha256 = SHA256.Create())
@@ -266,7 +280,10 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre
{
await _context.JSRequestResponseAsync(
subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}",
- request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) },
+ request: new StreamPurgeRequest
+ {
+ Filter = GetChunkSubject(info.Nuid),
+ },
cancellationToken);
}
catch (NatsJSApiException e)
@@ -308,6 +325,53 @@ public async ValueTask UpdateMetaAsync(string key, ObjectMetadat
return info;
}
+ public async ValueTask AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default)
+ {
+ ValidateObjectName(link);
+ ValidateObjectName(target.Name);
+
+ if (target.Deleted)
+ {
+ throw new NatsObjException("Can't link to a deleted object");
+ }
+
+ if (target.Options?.Link is not null)
+ {
+ throw new NatsObjException("Can't link to a linked object");
+ }
+
+ try
+ {
+ var checkLink = await GetInfoAsync(link, showDeleted: true, cancellationToken: cancellationToken).ConfigureAwait(false);
+ if (checkLink.Options?.Link is null)
+ {
+ throw new NatsObjException("Object already exists");
+ }
+ }
+ catch (NatsObjNotFoundException)
+ {
+ }
+
+ var info = new ObjectMetadata
+ {
+ Name = link,
+ Bucket = _bucket,
+ Nuid = NewNuid(),
+ Options = new MetaDataOptions
+ {
+ Link = new NatsObjLink
+ {
+ Name = target.Name,
+ Bucket = target.Bucket,
+ },
+ },
+ };
+
+ await PublishMeta(info, cancellationToken);
+
+ return info;
+ }
+
///
/// Get object metadata by key.
///
@@ -320,12 +384,16 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted
{
ValidateObjectName(key);
- var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
+ var request = new StreamMsgGetRequest
+ {
+ LastBySubj = GetMetaSubject(key),
+ };
try
{
var response = await _stream.GetAsync(request, cancellationToken);
- var data = NatsObjJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
+ var base64String = Convert.FromBase64String(response.Message.Data);
+ var data = NatsObjJsonSerializer.Default.Deserialize(new ReadOnlySequence(base64String)) ?? throw new NatsObjException("Can't deserialize object metadata");
if (!showDeleted && data.Deleted)
{
@@ -345,6 +413,44 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted
}
}
+ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ opts ??= new NatsObjWatchOpts();
+
+ var deliverPolicy = ConsumerConfigurationDeliverPolicy.all;
+ if (opts.UpdatesOnly)
+ {
+ deliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
+ }
+
+ await using var pushConsumer = new NatsJSOrderedPushConsumer>(
+ _context,
+ $"OBJ_{_bucket}",
+ $"$O.{_bucket}.M.>",
+ new NatsJSOrderedPushConsumerOpts
+ {
+ DeliverPolicy = deliverPolicy,
+ },
+ new NatsSubOpts(),
+ cancellationToken);
+
+ pushConsumer.Init();
+
+ await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
+ {
+ if (pushConsumer.IsDone)
+ continue;
+ using (msg.Data)
+ {
+ var info = JsonSerializer.Deserialize(msg.Data.Memory.Span, NatsObjJsonSerializerContext.Default.ObjectMetadata);
+ if (info != null)
+ {
+ yield return info;
+ }
+ }
+ }
+ }
+
///
/// Delete an object by key.
///
@@ -411,3 +517,8 @@ private void ValidateObjectName(string name)
}
}
}
+
+public record NatsObjWatchOpts
+{
+ public bool UpdatesOnly { get; init; }
+}
diff --git a/tests/Nats.Client.Compat/ObjectStoreCompat.cs b/tests/Nats.Client.Compat/ObjectStoreCompat.cs
index 63a7d8e50..e12e8d5cd 100644
--- a/tests/Nats.Client.Compat/ObjectStoreCompat.cs
+++ b/tests/Nats.Client.Compat/ObjectStoreCompat.cs
@@ -4,6 +4,9 @@
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
using NATS.Client.ObjectStore.Models;
+
+// ReSharper disable UnusedMember.Global
+// ReSharper disable UnusedType.Global
#pragma warning disable CS8602 // Dereference of a possibly null reference.
namespace Nats.Client.Compat;
@@ -115,31 +118,74 @@ public async Task TestWatchUpdates(NatsConnection nats, NatsMsg> ms
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
- Test.Log($"JSON: {json}");
- await msg.ReplyAsync();
+
+ // Test.Log($"JSON: {json}");
+ var bucket = json["bucket"].GetValue();
+
+ var store = await ob.GetObjectStoreAsync(bucket);
+
+ await foreach (var info in store.WatchAsync(new NatsObjWatchOpts { UpdatesOnly = true }))
+ {
+ await msg.ReplyAsync(info.Digest);
+ break;
+ }
}
public async Task TestWatch(NatsConnection nats, NatsMsg> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
- Test.Log($"JSON: {json}");
- await msg.ReplyAsync();
+
+ // Test.Log($"JSON: {json}");
+ var bucket = json["bucket"].GetValue();
+
+ var store = await ob.GetObjectStoreAsync(bucket);
+
+ var list = new List();
+ await foreach (var info in store.WatchAsync())
+ {
+ list.Add(info.Digest);
+ if (list.Count == 2)
+ break;
+ }
+
+ await msg.ReplyAsync($"{list[0]},{list[1]}");
}
public async Task TestGetLink(NatsConnection nats, NatsMsg> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
- Test.Log($"JSON: {json}");
- await msg.ReplyAsync();
+
+ // Test.Log($"JSON: {json}");
+ var bucket = json["bucket"].GetValue();
+ var objectName = json["object"].GetValue();
+
+ var store = await ob.GetObjectStoreAsync(bucket);
+
+ var bytes = await store.GetBytesAsync(objectName);
+
+ var sha256 = SHA256.HashData(bytes);
+
+ await msg.ReplyAsync(sha256);
}
public async Task TestPutLink(NatsConnection nats, NatsMsg> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
- Test.Log($"JSON: {json}");
+
+ // Test.Log($"JSON: {json}");
+ var bucket = json["bucket"].GetValue();
+ var objectName = json["object"].GetValue();
+ var linkName = json["link_name"].GetValue();
+
+ var store = await ob.GetObjectStoreAsync(bucket);
+
+ var target = await store.GetInfoAsync(objectName);
+
+ await store.AddLinkAsync(linkName, target);
+
await msg.ReplyAsync();
}
}
diff --git a/tests/Nats.Client.Compat/Test.cs b/tests/Nats.Client.Compat/Test.cs
index f568d0fdf..939009cf6 100644
--- a/tests/Nats.Client.Compat/Test.cs
+++ b/tests/Nats.Client.Compat/Test.cs
@@ -1,4 +1,4 @@
-using System.Reflection;
+using System.Reflection;
using NATS.Client.Core;
namespace Nats.Client.Compat;
@@ -38,10 +38,15 @@ public static async Task RunAsync()
}
else if (action == "command")
{
+ // We turn the suite name to a class name suffixed by 'Compat' and locate that class.
+ // For example suite 'object-store' becomes class name 'ObjectStoreCompat'
+ // which is our class containing those tests.
var typeName = typeof(Test).Namespace + "." + suite.Replace("-", string.Empty) + "compat";
var type = typeof(Test).Assembly.GetType(typeName, true, true);
var instance = Activator.CreateInstance(type!);
+ // Transform the test name to a method name prefixed by 'Test'
+ // so the test 'default-bucket' matches the method 'TestDefaultBucket'
var methodName = "test" + test.Replace("-", string.Empty);
var method = type!.GetMethod(methodName, BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase);