diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
index 5e6ba8c34..dad8b7086 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs
@@ -56,6 +56,11 @@ public record NatsKVConfig
///
public NatsKVRepublish? Republish { get; init; }
+ ///
+ /// Use compressed storage.
+ ///
+ public bool Compression { get; init; }
+
// TODO: Bucket mirror configuration.
// pub mirror: Option,
// Bucket sources configuration.
diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
index 57c1e2fd9..9081d648f 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
@@ -94,6 +94,7 @@ public async ValueTask CreateStoreAsync(NatsKVConfig config, Cance
MaxBytes = config.MaxBytes,
MaxAge = config.MaxAge.ToNanos(),
MaxMsgSize = config.MaxValueSize,
+ Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
Storage = storage,
Republish = republish!,
AllowRollupHdrs = true,
diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
index 5362ec983..04c71ca1e 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
@@ -354,7 +354,8 @@ public async IAsyncEnumerable> HistoryAsync(string key, INatsD
public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
- return new NatsKVStatus(Bucket, _stream.Info);
+ var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
+ return new NatsKVStatus(Bucket, isCompressed, _stream.Info);
}
///
@@ -457,4 +458,4 @@ internal async ValueTask> WatchInternalAsync(string key, INa
}
}
-public record NatsKVStatus(string Bucket, StreamInfo Info);
+public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);
diff --git a/src/NATS.Client.ObjectStore/NatsObjConfig.cs b/src/NATS.Client.ObjectStore/NatsObjConfig.cs
index 30e3ae8d2..9df981fbb 100644
--- a/src/NATS.Client.ObjectStore/NatsObjConfig.cs
+++ b/src/NATS.Client.ObjectStore/NatsObjConfig.cs
@@ -44,4 +44,9 @@ public record NatsObjConfig(string Bucket)
/// Additional metadata for the bucket.
///
public Dictionary? Metadata { get; init; }
+
+ ///
+ /// Use compressed storage.
+ ///
+ public bool Compression { get; init; }
}
diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs
index 978b0b5f7..02141e703 100644
--- a/src/NATS.Client.ObjectStore/NatsObjContext.cs
+++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs
@@ -58,7 +58,7 @@ public async ValueTask CreateObjectStore(NatsObjConfig config, Ca
AllowDirect = true,
Metadata = config.Metadata!,
Retention = StreamConfigurationRetention.limits,
- Compression = StreamConfigurationCompression.none,
+ Compression = config.Compression ? StreamConfigurationCompression.s2 : StreamConfigurationCompression.none,
};
var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs
index 8af3c8d1b..9fb6faaf7 100644
--- a/src/NATS.Client.ObjectStore/NatsObjStore.cs
+++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs
@@ -533,7 +533,8 @@ public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = defaul
public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default)
{
await _stream.RefreshAsync(cancellationToken);
- return new NatsObjStatus(Bucket, _stream.Info);
+ var isCompressed = _stream.Info.Config.Compression != StreamConfigurationCompression.none;
+ return new NatsObjStatus(Bucket, isCompressed, _stream.Info);
}
///
@@ -678,4 +679,4 @@ public record NatsObjListOpts
public bool ShowDeleted { get; init; }
}
-public record NatsObjStatus(string Bucket, StreamInfo Info);
+public record NatsObjStatus(string Bucket, bool IsCompressed, StreamInfo Info);
diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
index 7ae697012..c4521bfc8 100644
--- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
+++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
@@ -522,4 +522,33 @@ public async Task Status()
Assert.Equal("KV_kv1", status.Info.Config.Name);
Assert.Equal(10, status.Info.Config.MaxMsgsPerSubject);
}
+
+ [SkipIfNatsServer(versionEarlierThan: "2.10")]
+ public async Task Compressed_storage()
+ {
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+
+ var js = new NatsJSContext(nats);
+ var kv = new NatsKVContext(js);
+
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ var store1 = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { Compression = false }, cancellationToken: cancellationToken);
+ var store2 = await kv.CreateStoreAsync(new NatsKVConfig("kv2") { Compression = true }, cancellationToken: cancellationToken);
+
+ Assert.Equal("kv1", store1.Bucket);
+ Assert.Equal("kv2", store2.Bucket);
+
+ var status1 = await store1.GetStatusAsync(cancellationToken);
+ Assert.Equal("kv1", status1.Bucket);
+ Assert.Equal("KV_kv1", status1.Info.Config.Name);
+ Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);
+
+ var status2 = await store2.GetStatusAsync(cancellationToken);
+ Assert.Equal("kv2", status2.Bucket);
+ Assert.Equal("KV_kv2", status2.Info.Config.Name);
+ Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
+ }
}
diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
index 37f394ddc..421ae91b2 100644
--- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
+++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
@@ -1,7 +1,7 @@
-using System.IO.IsolatedStorage;
using System.Security.Cryptography;
using System.Text;
using NATS.Client.Core.Tests;
+using NATS.Client.JetStream.Models;
using NATS.Client.ObjectStore.Internal;
using NATS.Client.ObjectStore.Models;
@@ -367,4 +367,32 @@ public async Task List()
Assert.True(infos[3].Deleted);
}
}
+
+ [SkipIfNatsServer(versionEarlierThan: "2.10")]
+ public async Task Compressed_storage()
+ {
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+ var js = new NatsJSContext(nats);
+ var obj = new NatsObjContext(js);
+
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ var store1 = await obj.CreateObjectStore(new NatsObjConfig("b1") { Compression = false }, cancellationToken);
+ var store2 = await obj.CreateObjectStore(new NatsObjConfig("b2") { Compression = true }, cancellationToken);
+
+ Assert.Equal("b1", store1.Bucket);
+ Assert.Equal("b2", store2.Bucket);
+
+ var status1 = await store1.GetStatusAsync(cancellationToken);
+ Assert.Equal("b1", status1.Bucket);
+ Assert.Equal("OBJ_b1", status1.Info.Config.Name);
+ Assert.Equal(StreamConfigurationCompression.none, status1.Info.Config.Compression);
+
+ var status2 = await store2.GetStatusAsync(cancellationToken);
+ Assert.Equal("b2", status2.Bucket);
+ Assert.Equal("OBJ_b2", status2.Info.Config.Name);
+ Assert.Equal(StreamConfigurationCompression.s2, status2.Info.Config.Compression);
+ }
}
diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs
index 8347b10cc..73802cc4d 100644
--- a/tests/NATS.Client.TestUtilities/NatsServer.cs
+++ b/tests/NATS.Client.TestUtilities/NatsServer.cs
@@ -22,9 +22,10 @@ public static class ServerVersions
public class NatsServer : IAsyncDisposable
{
+ public static readonly Version Version;
+
private static readonly string Ext = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? ".exe" : string.Empty;
private static readonly string NatsServerPath = $"nats-server{Ext}";
- private static readonly Version Version;
private readonly string? _jetStreamStoreDir;
private readonly ITestOutputHelper _outputHelper;
@@ -550,11 +551,16 @@ public sealed class SkipIfNatsServer : FactAttribute
static SkipIfNatsServer() => SupportsTlsFirst = NatsServer.SupportsTlsFirst();
- public SkipIfNatsServer(bool doesNotSupportTlsFirst = false)
+ public SkipIfNatsServer(bool doesNotSupportTlsFirst = false, string? versionEarlierThan = default)
{
if (doesNotSupportTlsFirst && !SupportsTlsFirst)
{
Skip = "NATS server doesn't support TLS first";
}
+
+ if (versionEarlierThan != null && new Version(versionEarlierThan) > NatsServer.Version)
+ {
+ Skip = $"NATS server version ({NatsServer.Version}) is earlier than {versionEarlierThan}";
+ }
}
}