diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index c5f27234..d772ec2d 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -41,7 +41,7 @@ const ( // for that batch. type CompressionCodec struct { codec codecType - level int8 + level int } // NoCompression is a compression option that avoids compression. This can @@ -68,10 +68,7 @@ func ZstdCompression() CompressionCodec { return CompressionCodec{codecZstd, 0} // // If the level is invalid, compressors just use a default level. func (c CompressionCodec) WithLevel(level int) CompressionCodec { - if level > 127 { - level = 127 // lz4 could theoretically be large, I guess - } - c.level = int8(level) + c.level = level return c } @@ -116,8 +113,8 @@ out: case codecGzip: level := gzip.DefaultCompression if codec.level != 0 { - if _, err := gzip.NewWriterLevel(nil, int(codec.level)); err != nil { - level = int(codec.level) + if _, err := gzip.NewWriterLevel(nil, codec.level); err != nil { + level = codec.level } } c.gzPool = sync.Pool{New: func() any { c, _ := gzip.NewWriterLevel(nil, level); return c }} diff --git a/pkg/kgo/compression_test.go b/pkg/kgo/compression_test.go index 29b247e4..699a2f25 100644 --- a/pkg/kgo/compression_test.go +++ b/pkg/kgo/compression_test.go @@ -7,8 +7,22 @@ import ( "reflect" "sync" "testing" + + "github.com/pierrec/lz4/v4" ) +// Regression test for #778. +func TestCompressionCodecLZ4WithSpecifiedLevel(t *testing.T) { + t.Parallel() + + codec := Lz4Compression().WithLevel(512) + w := lz4.NewWriter(new(bytes.Buffer)) + err := w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(codec.level))) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + func TestNewCompressor(t *testing.T) { t.Parallel() for i, test := range []struct {