From d7ea2c3b615ca470cc5219edba68a6ba7a98e8b3 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Wed, 17 Jul 2024 12:16:08 -0400 Subject: [PATCH 1/2] fix setting lz4 compression levels --- pkg/kgo/compression.go | 7 ++----- pkg/kgo/compression_test.go | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index c5f27234..67389eb2 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -41,7 +41,7 @@ const ( // for that batch. type CompressionCodec struct { codec codecType - level int8 + level int } // NoCompression is a compression option that avoids compression. This can @@ -68,10 +68,7 @@ func ZstdCompression() CompressionCodec { return CompressionCodec{codecZstd, 0} // // If the level is invalid, compressors just use a default level. func (c CompressionCodec) WithLevel(level int) CompressionCodec { - if level > 127 { - level = 127 // lz4 could theoretically be large, I guess - } - c.level = int8(level) + c.level = level return c } diff --git a/pkg/kgo/compression_test.go b/pkg/kgo/compression_test.go index 29b247e4..699a2f25 100644 --- a/pkg/kgo/compression_test.go +++ b/pkg/kgo/compression_test.go @@ -7,8 +7,22 @@ import ( "reflect" "sync" "testing" + + "github.com/pierrec/lz4/v4" ) +// Regression test for #778. +func TestCompressionCodecLZ4WithSpecifiedLevel(t *testing.T) { + t.Parallel() + + codec := Lz4Compression().WithLevel(512) + w := lz4.NewWriter(new(bytes.Buffer)) + err := w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(codec.level))) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + func TestNewCompressor(t *testing.T) { t.Parallel() for i, test := range []struct { From 93471485d27ead3298e501ab0c6295eced8d25b9 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Sat, 20 Jul 2024 20:46:56 -0400 Subject: [PATCH 2/2] remove unnecessary conversions --- pkg/kgo/compression.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index 67389eb2..d772ec2d 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -113,8 +113,8 @@ out: case codecGzip: level := gzip.DefaultCompression if codec.level != 0 { - if _, err := gzip.NewWriterLevel(nil, int(codec.level)); err != nil { - level = int(codec.level) + if _, err := gzip.NewWriterLevel(nil, codec.level); err != nil { + level = codec.level } } c.gzPool = sync.Pool{New: func() any { c, _ := gzip.NewWriterLevel(nil, level); return c }}