Skip to content

Commit

Permalink
Merge pull request #697 from cristaloleg/use-codectype-explicitly
Browse files Browse the repository at this point in the history
pkg/kgo: use codecType explicitly
  • Loading branch information
twmb authored May 26, 2024
2 parents dc20ca0 + 100aa49 commit 48855f9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
12 changes: 6 additions & 6 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ type zstdDecoder struct {
}

func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
switch codec {
case 0:
switch codecType(codec) {
case codecNone:
return src, nil
case 1:
case codecGzip:
ungz := d.ungzPool.Get().(*gzip.Reader)
defer d.ungzPool.Put(ungz)
if err := ungz.Reset(bytes.NewReader(src)); err != nil {
Expand All @@ -273,12 +273,12 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
return nil, err
}
return out.Bytes(), nil
case 2:
case codecSnappy:
if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) {
return xerialDecode(src)
}
return s2.Decode(nil, src)
case 3:
case codecLZ4:
unlz4 := d.unlz4Pool.Get().(*lz4.Reader)
defer d.unlz4Pool.Put(unlz4)
unlz4.Reset(bytes.NewReader(src))
Expand All @@ -287,7 +287,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
return nil, err
}
return out.Bytes(), nil
case 4:
case codecZstd:
unzstd := d.unzstdPool.Get().(*zstdDecoder)
defer d.unzstdPool.Put(unzstd)
return unzstd.inner.DecodeAll(src, nil)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kgo/record_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,16 @@ func NewRecordFormatter(layout string) (*RecordFormatter, error) {
layout = layout[len("compression}"):]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte {
switch r.Attrs.CompressionType() {
case 0:
switch codecType(r.Attrs.CompressionType()) {
case codecNone:
return append(b, "none"...)
case 1:
case codecGzip:
return append(b, "gzip"...)
case 2:
case codecSnappy:
return append(b, "snappy"...)
case 3:
case codecLZ4:
return append(b, "lz4"...)
case 4:
case codecZstd:
return append(b, "zstd"...)
default:
return append(b, "unknown"...)
Expand Down

0 comments on commit 48855f9

Please sign in to comment.