diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index 27bfe6db..c5f27234 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -259,10 +259,10 @@ type zstdDecoder struct { } func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { - switch codec { - case 0: + switch codecType(codec) { + case codecNone: return src, nil - case 1: + case codecGzip: ungz := d.ungzPool.Get().(*gzip.Reader) defer d.ungzPool.Put(ungz) if err := ungz.Reset(bytes.NewReader(src)); err != nil { @@ -273,12 +273,12 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { return nil, err } return out.Bytes(), nil - case 2: + case codecSnappy: if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) { return xerialDecode(src) } return s2.Decode(nil, src) - case 3: + case codecLZ4: unlz4 := d.unlz4Pool.Get().(*lz4.Reader) defer d.unlz4Pool.Put(unlz4) unlz4.Reset(bytes.NewReader(src)) @@ -287,7 +287,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { return nil, err } return out.Bytes(), nil - case 4: + case codecZstd: unzstd := d.unzstdPool.Get().(*zstdDecoder) defer d.unzstdPool.Put(unzstd) return unzstd.inner.DecodeAll(src, nil) diff --git a/pkg/kgo/record_formatter.go b/pkg/kgo/record_formatter.go index 932cc5ce..d16427be 100644 --- a/pkg/kgo/record_formatter.go +++ b/pkg/kgo/record_formatter.go @@ -444,16 +444,16 @@ func NewRecordFormatter(layout string) (*RecordFormatter, error) { layout = layout[len("compression}"):] f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { return writeR(b, r, func(b []byte, r *Record) []byte { - switch r.Attrs.CompressionType() { - case 0: + switch codecType(r.Attrs.CompressionType()) { + case codecNone: return append(b, "none"...) - case 1: + case codecGzip: return append(b, "gzip"...) - case 2: + case codecSnappy: return append(b, "snappy"...) - case 3: + case codecLZ4: return append(b, "lz4"...) - case 4: + case codecZstd: return append(b, "zstd"...) default: return append(b, "unknown"...)