diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs index 5e6ba8c34..dad8b7086 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs @@ -56,6 +56,11 @@ public record NatsKVConfig /// public NatsKVRepublish? Republish { get; init; } + /// + /// Use compressed storage. + /// + public bool Compression { get; init; } + // TODO: Bucket mirror configuration. // pub mirror: Option, // Bucket sources configuration. diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 57c1e2fd9..9081d648f 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -94,6 +94,7 @@ public async ValueTask CreateStoreAsync(NatsKVConfig config, Cance MaxBytes = config.MaxBytes, MaxAge = config.MaxAge.ToNanos(), MaxMsgSize = config.MaxValueSize, + Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none, Storage = storage, Republish = republish!, AllowRollupHdrs = true, diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 5362ec983..04c71ca1e 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -354,7 +354,8 @@ public async IAsyncEnumerable> HistoryAsync(string key, INatsD public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default) { await _stream.RefreshAsync(cancellationToken); - return new NatsKVStatus(Bucket, _stream.Info); + var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none; + return new NatsKVStatus(Bucket, isCompressed, _stream.Info); } /// @@ -457,4 +458,4 @@ internal async ValueTask> WatchInternalAsync(string key, INa } } -public record NatsKVStatus(string Bucket, StreamInfo Info); +public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info); diff --git a/src/NATS.Client.ObjectStore/NatsObjConfig.cs b/src/NATS.Client.ObjectStore/NatsObjConfig.cs index 30e3ae8d2..9df981fbb 100644 --- a/src/NATS.Client.ObjectStore/NatsObjConfig.cs +++ b/src/NATS.Client.ObjectStore/NatsObjConfig.cs @@ -44,4 +44,9 @@ public record NatsObjConfig(string Bucket) /// Additional metadata for the bucket. /// public Dictionary? Metadata { get; init; } + + /// + /// Use compressed storage. + /// + public bool Compression { get; init; } } diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index 978b0b5f7..02141e703 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -58,7 +58,7 @@ public async ValueTask CreateObjectStore(NatsObjConfig config, Ca AllowDirect = true, Metadata = config.Metadata!, Retention = StreamConfigurationRetention.limits, - Compression = StreamConfigurationCompression.none, + Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none, }; var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken); diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 8af3c8d1b..9fb6faaf7 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -533,7 +533,8 @@ public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = defaul public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default) { await _stream.RefreshAsync(cancellationToken); - return new NatsObjStatus(Bucket, _stream.Info); + var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none; + return new NatsObjStatus(Bucket, isCompressed, _stream.Info); } /// @@ -678,4 +679,4 @@ public record NatsObjListOpts public bool ShowDeleted { get; init; } } -public record NatsObjStatus(string Bucket, StreamInfo Info); +public record NatsObjStatus(string Bucket, bool IsCompressed, StreamInfo Info); diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 7ae697012..c4521bfc8 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -522,4 +522,33 @@ public async Task Status() Assert.Equal("KV_kv1", status.Info.Config.Name); Assert.Equal(10, status.Info.Config.MaxMsgsPerSubject); } + + [SkipIfNatsServer(versionEarlierThan: "2.10")] + public async Task Compressed_storage() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + var store1 = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { Compression = false }, cancellationToken: cancellationToken); + var store2 = await kv.CreateStoreAsync(new NatsKVConfig("kv2") { Compression = true }, cancellationToken: cancellationToken); + + Assert.Equal("kv1", store1.Bucket); + Assert.Equal("kv2", store2.Bucket); + + var status1 = await store1.GetStatusAsync(cancellationToken); + Assert.Equal("kv1", status1.Bucket); + Assert.Equal("KV_kv1", status1.Info.Config.Name); + Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression); + + var status2 = await store2.GetStatusAsync(cancellationToken); + Assert.Equal("kv2", status2.Bucket); + Assert.Equal("KV_kv2", status2.Info.Config.Name); + Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression); + } } diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 37f394ddc..421ae91b2 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -1,7 +1,7 @@ -using System.IO.IsolatedStorage; using System.Security.Cryptography; using System.Text; using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; using NATS.Client.ObjectStore.Internal; using NATS.Client.ObjectStore.Models; @@ -367,4 +367,32 @@ public async Task List() Assert.True(infos[3].Deleted); } } + + [SkipIfNatsServer(versionEarlierThan: "2.10")] + public async Task Compressed_storage() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var obj = new NatsObjContext(js); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1") { Compression = false }, cancellationToken); + var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2") { Compression = true }, cancellationToken); + + Assert.Equal("b1", store1.Bucket); + Assert.Equal("b2", store2.Bucket); + + var status1 = await store1.GetStatusAsync(cancellationToken); + Assert.Equal("b1", status1.Bucket); + Assert.Equal("OBJ_b1", status1.Info.Config.Name); + Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression); + + var status2 = await store2.GetStatusAsync(cancellationToken); + Assert.Equal("b2", status2.Bucket); + Assert.Equal("OBJ_b2", status2.Info.Config.Name); + Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression); + } } diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index 8347b10cc..73802cc4d 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -22,9 +22,10 @@ public static class ServerVersions public class NatsServer : IAsyncDisposable { + public static readonly Version Version; + private static readonly string Ext = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty; private static readonly string NatsServerPath = $"nats-server{Ext}"; - private static readonly Version Version; private readonly string? _jetStreamStoreDir; private readonly ITestOutputHelper _outputHelper; @@ -550,11 +551,16 @@ public sealed class SkipIfNatsServer : FactAttribute static SkipIfNatsServer() => SupportsTlsFirst = NatsServer.SupportsTlsFirst(); - public SkipIfNatsServer(bool doesNotSupportTlsFirst = false) + public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default) { if (doesNotSupportTlsFirst && !SupportsTlsFirst) { Skip = "NATS server doesn't support TLS first"; } + + if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServer.Version) + { + Skip = $"NATS server version ({NatsServer.Version}) is earlier than {versionEarlierThan}"; + } } }