Skip to content

Commit

Permalink
Key/Value and Object Store compression (#214)
Browse files Browse the repository at this point in the history
* Key/Value and Object Store compression

* Test server version check for stream compression

* Compression renaming
  • Loading branch information
mtmk authored Nov 16, 2023
1 parent 1bf5c01 commit 222eaeb
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 8 deletions.
5 changes: 5 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public record NatsKVConfig
/// </summary>
public NatsKVRepublish? Republish { get; init; }

/// <summary>
/// Use compressed storage.
/// </summary>
public bool Compression { get; init; }

// TODO: Bucket mirror configuration.
// pub mirror: Option<Source>,
// Bucket sources configuration.
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public async ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, Cance
MaxBytes = config.MaxBytes,
MaxAge = config.MaxAge.ToNanos(),
MaxMsgSize = config.MaxValueSize,
Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
Storage = storage,
Republish = republish!,
AllowRollupHdrs = true,
Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public async IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsD
public async ValueTask<NatsKVStatus> GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
return new NatsKVStatus(Bucket, _stream.Info);
var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
return new NatsKVStatus(Bucket, isCompressed, _stream.Info);
}

/// <summary>
Expand Down Expand Up @@ -453,4 +454,4 @@ internal async ValueTask<NatsKVWatcher<T>> WatchInternalAsync<T>(string key, INa
}
}

public record NatsKVStatus(string Bucket, StreamInfo Info);
public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);
5 changes: 5 additions & 0 deletions src/NATS.Client.ObjectStore/NatsObjConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ public record NatsObjConfig(string Bucket)
/// Additional metadata for the bucket.
/// </summary>
public Dictionary<string, string>? Metadata { get; init; }

/// <summary>
/// Use compressed storage.
/// </summary>
public bool Compression { get; init; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async ValueTask<INatsObjStore> CreateObjectStore(NatsObjConfig config, Ca
AllowDirect = true,
Metadata = config.Metadata!,
Retention = StreamConfigurationRetention.limits,
Compression = StreamConfigurationCompression.none,
Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
};

var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ public IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = defaul
public async ValueTask<NatsObjStatus> GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
return new NatsObjStatus(Bucket, _stream.Info);
var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
return new NatsObjStatus(Bucket, isCompressed, _stream.Info);
}

/// <summary>
Expand Down Expand Up @@ -678,4 +679,4 @@ public record NatsObjListOpts
public bool ShowDeleted { get; init; }
}

public record NatsObjStatus(string Bucket, StreamInfo Info);
public record NatsObjStatus(string Bucket, bool IsCompressed, StreamInfo Info);
29 changes: 29 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,4 +522,33 @@ public async Task Status()
Assert.Equal("KV_kv1", status.Info.Config.Name);
Assert.Equal(10, status.Info.Config.MaxMsgsPerSubject);
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Compressed_storage()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var store1 = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { Compression = false }, cancellationToken: cancellationToken);
var store2 = await kv.CreateStoreAsync(new NatsKVConfig("kv2") { Compression = true }, cancellationToken: cancellationToken);

Assert.Equal("kv1", store1.Bucket);
Assert.Equal("kv2", store2.Bucket);

var status1 = await store1.GetStatusAsync(cancellationToken);
Assert.Equal("kv1", status1.Bucket);
Assert.Equal("KV_kv1", status1.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);

var status2 = await store2.GetStatusAsync(cancellationToken);
Assert.Equal("kv2", status2.Bucket);
Assert.Equal("KV_kv2", status2.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
}
}
30 changes: 29 additions & 1 deletion tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.IO.IsolatedStorage;
using System.Security.Cryptography;
using System.Text;
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;
using NATS.Client.ObjectStore.Internal;
using NATS.Client.ObjectStore.Models;

Expand Down Expand Up @@ -367,4 +367,32 @@ public async Task List()
Assert.True(infos[3].Deleted);
}
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Compressed_storage()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1") { Compression = false }, cancellationToken);
var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2") { Compression = true }, cancellationToken);

Assert.Equal("b1", store1.Bucket);
Assert.Equal("b2", store2.Bucket);

var status1 = await store1.GetStatusAsync(cancellationToken);
Assert.Equal("b1", status1.Bucket);
Assert.Equal("OBJ_b1", status1.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);

var status2 = await store2.GetStatusAsync(cancellationToken);
Assert.Equal("b2", status2.Bucket);
Assert.Equal("OBJ_b2", status2.Info.Config.Name);
Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
}
}
10 changes: 8 additions & 2 deletions tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ public static class ServerVersions

public class NatsServer : IAsyncDisposable
{
public static readonly Version Version;

private static readonly string Ext = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string NatsServerPath = $"nats-server{Ext}";
private static readonly Version Version;

private readonly string? _jetStreamStoreDir;
private readonly ITestOutputHelper _outputHelper;
Expand Down Expand Up @@ -550,11 +551,16 @@ public sealed class SkipIfNatsServer : FactAttribute

static SkipIfNatsServer() => SupportsTlsFirst = NatsServer.SupportsTlsFirst();

public SkipIfNatsServer(bool doesNotSupportTlsFirst = false)
public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default)
{
if (doesNotSupportTlsFirst && !SupportsTlsFirst)
{
Skip = "NATS server doesn't support TLS first";
}

if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServer.Version)
{
Skip = $"NATS server version ({NatsServer.Version}) is earlier than {versionEarlierThan}";
}
}
}

0 comments on commit 222eaeb

Please sign in to comment.