Skip to content

Commit

Permalink
Completed object store compat
Browse files Browse the repository at this point in the history
* Added Object Store Watch
* Added Link features
* Metadata fixes
  • Loading branch information
mtmk committed Nov 1, 2023
1 parent 0559173 commit 2f579b8
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal static class NatsObjJsonSerializer

[JsonSerializable(typeof(ObjectMetadata))]
[JsonSerializable(typeof(MetaDataOptions))]
[JsonSerializable(typeof(NatsObjLink))]
internal partial class NatsObjJsonSerializerContext : JsonSerializerContext
{
}
28 changes: 26 additions & 2 deletions src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,39 @@ public record ObjectMetadata
/// </summary>
[JsonPropertyName("options")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public MetaDataOptions Options { get; set; } = default!;
public MetaDataOptions? Options { get; set; } = default!;
}

public record MetaDataOptions
{
/// <summary>
/// Link
/// </summary>
[JsonPropertyName("link")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public NatsObjLink? Link { get; set; } = default!;

/// <summary>
/// Max chunk size
/// </summary>
[JsonPropertyName("max_chunk_size")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int MaxChunkSize { get; set; } = default!;
public int? MaxChunkSize { get; set; } = default!;
}

public record NatsObjLink
{
/// <summary>
/// Link name
/// </summary>
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Name { get; set; } = default!;

/// <summary>
/// Bucket name
/// </summary>
[JsonPropertyName("bucket")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Bucket { get; set; } = default!;
}
4 changes: 2 additions & 2 deletions src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public async ValueTask<NatsObjStore> CreateObjectStore(NatsObjConfig config, Can
};

var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
return new NatsObjStore(config, _context, stream);
return new NatsObjStore(config, this, _context, stream);
}

public async ValueTask<NatsObjStore> GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);
var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken);
return new NatsObjStore(new NatsObjConfig(bucket), _context, stream);
return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream);
}

/// <summary>
Expand Down
131 changes: 121 additions & 10 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text.Json;
using System.Text.RegularExpressions;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
Expand All @@ -21,15 +23,18 @@ public class NatsObjStore
private const string RollupSubject = "sub";

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);

private readonly string _bucket;
private readonly NatsObjContext _objContext;
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;

internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream)
internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream)
{
_bucket = config.Bucket;
_objContext = objContext;
_context = context;
_stream = stream;
}
Expand Down Expand Up @@ -62,11 +67,20 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool

var info = await GetInfoAsync(key, cancellationToken: cancellationToken);

if (info.Options?.Link is { } link)
{
var store = await _objContext.GetObjectStoreAsync(link.Bucket, cancellationToken).ConfigureAwait(false);
return await store.GetAsync(link.Name, stream, leaveOpen, cancellationToken).ConfigureAwait(false);
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<IMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
GetChunkSubject(info.Nuid),
new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all },
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
},
new NatsSubOpts(),
cancellationToken);

Expand Down Expand Up @@ -180,19 +194,19 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
meta.Nuid = nuid;
meta.MTime = DateTimeOffset.UtcNow;

if (meta.Options == null!)
meta.Options ??= new MetaDataOptions
{
meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize };
}
MaxChunkSize = DefaultChunkSize,
};

if (meta.Options.MaxChunkSize == 0)
if (meta.Options.MaxChunkSize is null or <= 0)
{
meta.Options.MaxChunkSize = DefaultChunkSize;
}

var size = 0;
var chunks = 0;
var chunkSize = meta.Options.MaxChunkSize;
var chunkSize = meta.Options.MaxChunkSize!.Value;

string digest;
using (var sha256 = SHA256.Create())
Expand Down Expand Up @@ -266,7 +280,10 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
{
await _context.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}",
request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) },
request: new StreamPurgeRequest
{
Filter = GetChunkSubject(info.Nuid),
},
cancellationToken);
}
catch (NatsJSApiException e)
Expand Down Expand Up @@ -308,6 +325,53 @@ public async ValueTask<ObjectMetadata> UpdateMetaAsync(string key, ObjectMetadat
return info;
}

public async ValueTask<ObjectMetadata> AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default)
{
ValidateObjectName(link);
ValidateObjectName(target.Name);

if (target.Deleted)
{
throw new NatsObjException("Can't link to a deleted object");
}

if (target.Options?.Link is not null)
{
throw new NatsObjException("Can't link to a linked object");
}

try
{
var checkLink = await GetInfoAsync(link, showDeleted: true, cancellationToken: cancellationToken).ConfigureAwait(false);
if (checkLink.Options?.Link is null)
{
throw new NatsObjException("Object already exists");
}
}
catch (NatsObjNotFoundException)
{
}

var info = new ObjectMetadata
{
Name = link,
Bucket = _bucket,
Nuid = NewNuid(),
Options = new MetaDataOptions
{
Link = new NatsObjLink
{
Name = target.Name,
Bucket = target.Bucket,
},
},
};

await PublishMeta(info, cancellationToken);

return info;
}

/// <summary>
/// Get object metadata by key.
/// </summary>
Expand All @@ -320,12 +384,16 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
{
ValidateObjectName(key);

var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
var request = new StreamMsgGetRequest
{
LastBySubj = GetMetaSubject(key),
};
try
{
var response = await _stream.GetAsync(request, cancellationToken);

var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
var base64String = Convert.FromBase64String(response.Message.Data);
var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(base64String)) ?? throw new NatsObjException("Can't deserialize object metadata");

if (!showDeleted && data.Deleted)
{
Expand All @@ -345,6 +413,44 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
}
}

public async IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= new NatsObjWatchOpts();

var deliverPolicy = ConsumerConfigurationDeliverPolicy.all;
if (opts.UpdatesOnly)
{
deliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<NatsMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
$"$O.{_bucket}.M.>",
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = deliverPolicy,
},
new NatsSubOpts(),
cancellationToken);

pushConsumer.Init();

await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
{
if (pushConsumer.IsDone)
continue;
using (msg.Data)
{
var info = JsonSerializer.Deserialize(msg.Data.Memory.Span, NatsObjJsonSerializerContext.Default.ObjectMetadata);
if (info != null)
{
yield return info;
}
}
}
}

/// <summary>
/// Delete an object by key.
/// </summary>
Expand Down Expand Up @@ -411,3 +517,8 @@ private void ValidateObjectName(string name)
}
}
}

public record NatsObjWatchOpts
{
public bool UpdatesOnly { get; init; }
}
60 changes: 53 additions & 7 deletions tests/Nats.Client.Compat/ObjectStoreCompat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
using NATS.Client.ObjectStore.Models;

// ReSharper disable UnusedMember.Global
// ReSharper disable UnusedType.Global
#pragma warning disable CS8602 // Dereference of a possibly null reference.

namespace Nats.Client.Compat;
Expand Down Expand Up @@ -115,31 +118,74 @@ public async Task TestWatchUpdates(NatsConnection nats, NatsMsg<Memory<byte>> ms
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
Test.Log($"JSON: {json}");
await msg.ReplyAsync();

// Test.Log($"JSON: {json}");
var bucket = json["bucket"].GetValue<string>();

var store = await ob.GetObjectStoreAsync(bucket);

await foreach (var info in store.WatchAsync(new NatsObjWatchOpts { UpdatesOnly = true }))
{
await msg.ReplyAsync(info.Digest);
break;
}
}

public async Task TestWatch(NatsConnection nats, NatsMsg<Memory<byte>> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
Test.Log($"JSON: {json}");
await msg.ReplyAsync();

// Test.Log($"JSON: {json}");
var bucket = json["bucket"].GetValue<string>();

var store = await ob.GetObjectStoreAsync(bucket);

var list = new List<string>();
await foreach (var info in store.WatchAsync())
{
list.Add(info.Digest);
if (list.Count == 2)
break;
}

await msg.ReplyAsync($"{list[0]},{list[1]}");
}

public async Task TestGetLink(NatsConnection nats, NatsMsg<Memory<byte>> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
Test.Log($"JSON: {json}");
await msg.ReplyAsync();

// Test.Log($"JSON: {json}");
var bucket = json["bucket"].GetValue<string>();
var objectName = json["object"].GetValue<string>();

var store = await ob.GetObjectStoreAsync(bucket);

var bytes = await store.GetBytesAsync(objectName);

var sha256 = SHA256.HashData(bytes);

await msg.ReplyAsync(sha256);
}

public async Task TestPutLink(NatsConnection nats, NatsMsg<Memory<byte>> msg)
{
var ob = new NatsObjContext(new NatsJSContext(nats));
var json = JsonNode.Parse(msg.Data.Span);
Test.Log($"JSON: {json}");

// Test.Log($"JSON: {json}");
var bucket = json["bucket"].GetValue<string>();
var objectName = json["object"].GetValue<string>();
var linkName = json["link_name"].GetValue<string>();

var store = await ob.GetObjectStoreAsync(bucket);

var target = await store.GetInfoAsync(objectName);

await store.AddLinkAsync(linkName, target);

await msg.ReplyAsync();
}
}
7 changes: 6 additions & 1 deletion tests/Nats.Client.Compat/Test.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Reflection;
using System.Reflection;
using NATS.Client.Core;

namespace Nats.Client.Compat;
Expand Down Expand Up @@ -38,10 +38,15 @@ public static async Task RunAsync()
}
else if (action == "command")
{
// We turn the suite name to a class name suffixed by 'Compat' and locate that class.
// For example suite 'object-store' becomes class name 'ObjectStoreCompat'
// which is our class containing those tests.
var typeName = typeof(Test).Namespace + "." + suite.Replace("-", string.Empty) + "compat";
var type = typeof(Test).Assembly.GetType(typeName, true, true);
var instance = Activator.CreateInstance(type!);

// Transform the test name to a method name prefixed by 'Test'
// so the test 'default-bucket' matches the method 'TestDefaultBucket'
var methodName = "test" + test.Replace("-", string.Empty);
var method = type!.GetMethod(methodName, BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase);

Expand Down

0 comments on commit 2f579b8

Please sign in to comment.