Skip to content

Commit

Permalink
Client compatibility tests (#159)
Browse files Browse the repository at this point in the history
* Object Store client compatibility tests

* Object store compat tests put/get/update

* Completed object store compat

* Added Object Store Watch
* Added Link features
* Metadata fixes

* Service compat tests and fixes
  • Loading branch information
mtmk authored Nov 3, 2023
1 parent a000cb0 commit be058d5
Show file tree
Hide file tree
Showing 18 changed files with 669 additions and 54 deletions.
7 changes: 7 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nats.Client.Compat", "tests\Nats.Client.Compat\Nats.Client.Compat.csproj", "{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services", "src\NATS.Client.Services\NATS.Client.Services.csproj", "{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services.Tests", "tests\NATS.Client.Services.Tests\NATS.Client.Services.Tests.csproj", "{749CAE39-4C1E-4627-9E31-A36B987BC453}"
Expand Down Expand Up @@ -217,6 +219,10 @@ Global
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.Build.0 = Release|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -284,6 +290,7 @@ Global
{3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{749CAE39-4C1E-4627-9E31-A36B987BC453} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F} = {95A69671-16CA-4133-981C-CC381B7AAA30}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal static class NatsObjJsonSerializer

[JsonSerializable(typeof(ObjectMetadata))]
[JsonSerializable(typeof(MetaDataOptions))]
[JsonSerializable(typeof(NatsObjLink))]
internal partial class NatsObjJsonSerializerContext : JsonSerializerContext
{
}
45 changes: 41 additions & 4 deletions src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ public record ObjectMetadata
[JsonPropertyName("name")]
public string Name { get; set; } = default!;

/// <summary>
/// Object description
/// </summary>
[JsonPropertyName("description")]
public string Description { get; set; } = default!;

/// <summary>
/// Bucket name
/// </summary>
Expand Down Expand Up @@ -52,9 +58,16 @@ public record ObjectMetadata
/// <summary>
/// Object metadata
/// </summary>
[JsonPropertyName("meta")]
[JsonPropertyName("metadata")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public Dictionary<string, string> Metadata { get; set; } = default!;

/// <summary>
/// Object metadata
/// </summary>
[JsonPropertyName("headers")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public Dictionary<string, string> Meta { get; set; } = default!;
public Dictionary<string, string> Headers { get; set; } = default!;

/// <summary>
/// Object deleted
Expand All @@ -68,15 +81,39 @@ public record ObjectMetadata
/// </summary>
[JsonPropertyName("options")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public MetaDataOptions Options { get; set; } = default!;
public MetaDataOptions? Options { get; set; } = default!;
}

public record MetaDataOptions
{
/// <summary>
/// Link
/// </summary>
[JsonPropertyName("link")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public NatsObjLink? Link { get; set; } = default!;

/// <summary>
/// Max chunk size
/// </summary>
[JsonPropertyName("max_chunk_size")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int MaxChunkSize { get; set; } = default!;
public int? MaxChunkSize { get; set; } = default!;
}

public record NatsObjLink
{
/// <summary>
/// Link name
/// </summary>
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Name { get; set; } = default!;

/// <summary>
/// Bucket name
/// </summary>
[JsonPropertyName("bucket")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Bucket { get; set; } = default!;
}
2 changes: 1 addition & 1 deletion src/NATS.Client.ObjectStore/NatsObjConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public record NatsObjConfig(string Bucket)
/// <summary>
/// Type of backing storage to use.
/// </summary>
public NatsObjStorageType? Storage { get; init; }
public NatsObjStorageType Storage { get; init; }

/// <summary>
/// How many replicas to keep for each key.
Expand Down
10 changes: 9 additions & 1 deletion src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,18 @@ public async ValueTask<NatsObjStore> CreateObjectStore(NatsObjConfig config, Can
AllowDirect = true,
Metadata = config.Metadata!,
Retention = StreamConfigurationRetention.limits,
Compression = StreamConfigurationCompression.none,
};

var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
return new NatsObjStore(config, _context, stream);
return new NatsObjStore(config, this, _context, stream);
}

public async ValueTask<NatsObjStore> GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);
var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken);
return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream);
}

/// <summary>
Expand Down
160 changes: 150 additions & 10 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text.Json;
using System.Text.RegularExpressions;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
Expand All @@ -21,15 +23,18 @@ public class NatsObjStore
private const string RollupSubject = "sub";

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);

private readonly string _bucket;
private readonly NatsObjContext _objContext;
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;

internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream)
internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream)
{
_bucket = config.Bucket;
_objContext = objContext;
_context = context;
_stream = stream;
}
Expand Down Expand Up @@ -62,11 +67,20 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool

var info = await GetInfoAsync(key, cancellationToken: cancellationToken);

if (info.Options?.Link is { } link)
{
var store = await _objContext.GetObjectStoreAsync(link.Bucket, cancellationToken).ConfigureAwait(false);
return await store.GetAsync(link.Name, stream, leaveOpen, cancellationToken).ConfigureAwait(false);
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<IMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
GetChunkSubject(info.Nuid),
new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all },
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
},
new NatsSubOpts(),
cancellationToken);

Expand Down Expand Up @@ -180,19 +194,19 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
meta.Nuid = nuid;
meta.MTime = DateTimeOffset.UtcNow;

if (meta.Options == null!)
meta.Options ??= new MetaDataOptions
{
meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize };
}
MaxChunkSize = DefaultChunkSize,
};

if (meta.Options.MaxChunkSize == 0)
if (meta.Options.MaxChunkSize is null or <= 0)
{
meta.Options.MaxChunkSize = DefaultChunkSize;
}

var size = 0;
var chunks = 0;
var chunkSize = meta.Options.MaxChunkSize;
var chunkSize = meta.Options.MaxChunkSize!.Value;

string digest;
using (var sha256 = SHA256.Create())
Expand Down Expand Up @@ -266,7 +280,10 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
{
await _context.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}",
request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) },
request: new StreamPurgeRequest
{
Filter = GetChunkSubject(info.Nuid),
},
cancellationToken);
}
catch (NatsJSApiException e)
Expand All @@ -279,6 +296,82 @@ await _context.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
return meta;
}

public async ValueTask<ObjectMetadata> UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default)
{
ValidateObjectName(meta.Name);

var info = await GetInfoAsync(key, cancellationToken: cancellationToken).ConfigureAwait(false);

if (key != meta.Name)
{
// Make sure the new name is available
try
{
await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
throw new NatsObjException($"Object already exists: {meta.Name}");
}
catch (NatsObjNotFoundException)
{
}
}

info.Name = meta.Name;
info.Description = meta.Description;
info.Metadata = meta.Metadata;
info.Headers = meta.Headers;

await PublishMeta(info, cancellationToken);

return info;
}

public async ValueTask<ObjectMetadata> AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default)
{
ValidateObjectName(link);
ValidateObjectName(target.Name);

if (target.Deleted)
{
throw new NatsObjException("Can't link to a deleted object");
}

if (target.Options?.Link is not null)
{
throw new NatsObjException("Can't link to a linked object");
}

try
{
var checkLink = await GetInfoAsync(link, showDeleted: true, cancellationToken: cancellationToken).ConfigureAwait(false);
if (checkLink.Options?.Link is null)
{
throw new NatsObjException("Object already exists");
}
}
catch (NatsObjNotFoundException)
{
}

var info = new ObjectMetadata
{
Name = link,
Bucket = _bucket,
Nuid = NewNuid(),
Options = new MetaDataOptions
{
Link = new NatsObjLink
{
Name = target.Name,
Bucket = target.Bucket,
},
},
};

await PublishMeta(info, cancellationToken);

return info;
}

/// <summary>
/// Get object metadata by key.
/// </summary>
Expand All @@ -291,12 +384,16 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
{
ValidateObjectName(key);

var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
var request = new StreamMsgGetRequest
{
LastBySubj = GetMetaSubject(key),
};
try
{
var response = await _stream.GetAsync(request, cancellationToken);

var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
var base64String = Convert.FromBase64String(response.Message.Data);
var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(base64String)) ?? throw new NatsObjException("Can't deserialize object metadata");

if (!showDeleted && data.Deleted)
{
Expand All @@ -316,6 +413,44 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
}
}

public async IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= new NatsObjWatchOpts();

var deliverPolicy = ConsumerConfigurationDeliverPolicy.all;
if (opts.UpdatesOnly)
{
deliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<NatsMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
$"$O.{_bucket}.M.>",
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = deliverPolicy,
},
new NatsSubOpts(),
cancellationToken);

pushConsumer.Init();

await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
{
if (pushConsumer.IsDone)
continue;
using (msg.Data)
{
var info = JsonSerializer.Deserialize(msg.Data.Memory.Span, NatsObjJsonSerializerContext.Default.ObjectMetadata);
if (info != null)
{
yield return info;
}
}
}
}

/// <summary>
/// Delete an object by key.
/// </summary>
Expand Down Expand Up @@ -382,3 +517,8 @@ private void ValidateObjectName(string name)
}
}
}

public record NatsObjWatchOpts
{
public bool UpdatesOnly { get; init; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Services/NatsSvcConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ public NatsSvcConfig(string name, string version)
/// Stats handler. JSON object returned by this handler will be included in
/// the service stats <c>data</c> property.
/// </summary>
public Func<JsonNode>? StatsHandler { get; init; }
public Func<INatsSvcEndpoint, JsonNode>? StatsHandler { get; init; }
}
Loading

0 comments on commit be058d5

Please sign in to comment.