Skip to content

Commit

Permalink
[ADDED] Compression option on KV bucket
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Oct 30, 2023
1 parent 610da83 commit cf1f31d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
15 changes: 15 additions & 0 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type (
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource

// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool
}

KeyValueLister interface {
Expand Down Expand Up @@ -123,6 +127,9 @@ type (

// Bytes returns the size in bytes of the bucket
Bytes() uint64

// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}

// KeyWatcher is what is returned when doing a watch.
Expand Down Expand Up @@ -322,6 +329,10 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
duplicateWindow = cfg.TTL
}
var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Expand All @@ -339,6 +350,7 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
MaxConsumers: -1,
AllowDirect: true,
RePublish: cfg.RePublish,
Compression: compression,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
Expand Down Expand Up @@ -479,6 +491,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
// Bytes is the size of the stream
func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }

// IsCompressed indicates if the data is compressed on disk
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }

type kvLister struct {
kvs chan KeyValueStatus
kvNames chan string
Expand Down
35 changes: 35 additions & 0 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,3 +1356,38 @@ func expectErr(t *testing.T, err error, expected ...error) {
}
t.Fatalf("Expected one of %+v, got '%v'", expected, err)
}

func TestKeyValueCompression(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

kvCompressed, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
Compression: true,
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

status, err := kvCompressed.Status(ctx)
if err != nil {
t.Fatalf("Error getting bucket status: %v", err)
}

if !status.IsCompressed() {
t.Fatalf("Expected bucket to be compressed")
}

kvStream, err := js.Stream(ctx, "KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}

if kvStream.CachedInfo().Config.Compression != jetstream.S2Compression {
t.Fatalf("Expected stream to be compressed with S2")
}
}
15 changes: 15 additions & 0 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type KeyValueStatus interface {

// Bytes returns the size in bytes of the bucket
Bytes() uint64

// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}

// KeyWatcher is what is returned when doing a watch.
Expand Down Expand Up @@ -249,6 +252,10 @@ type KeyValueConfig struct {
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource

// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool
}

// Used to watch all keys.
Expand Down Expand Up @@ -405,6 +412,10 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
duplicateWindow = cfg.TTL
}
var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Expand All @@ -422,6 +433,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
MaxConsumers: -1,
AllowDirect: true,
RePublish: cfg.RePublish,
Compression: compression,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
Expand Down Expand Up @@ -1040,6 +1052,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
// Bytes is the size of the stream
func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }

// IsCompressed indicates if the data is compressed on disk
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }

// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
Expand Down
34 changes: 34 additions & 0 deletions test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,3 +1395,37 @@ func TestKeyValueSourcing(t *testing.T) {
t.Fatalf("Got error getting keyB from C: %v", err)
}
}

func TestKeyValueCompression(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "A",
Compression: true,
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

status, err := kv.Status()
if err != nil {
t.Fatalf("Error getting bucket status: %v", err)
}

if !status.IsCompressed() {
t.Fatalf("Expected bucket to be compressed")
}

kvStream, err := js.StreamInfo("KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}

if kvStream.Config.Compression != nats.S2Compression {
t.Fatalf("Expected stream to be compressed with S2")
}
}

0 comments on commit cf1f31d

Please sign in to comment.