Skip to content

Commit

Permalink
optimise: use byteBuffer pool in decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor committed Jul 10, 2024
1 parent e403d59 commit 5fcbd7f
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,38 +255,43 @@ type zstdDecoder struct {
}

func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
switch codecType(codec) {
case codecNone:
// Early return in case there is no compression
compCodec := codecType(codec)
if compCodec == codecNone {
return src, nil
}
out := byteBuffers.Get().(*bytes.Buffer)
out.Reset()
defer byteBuffers.Put(out)

switch compCodec {
case codecGzip:
ungz := d.ungzPool.Get().(*gzip.Reader)
defer d.ungzPool.Put(ungz)
if err := ungz.Reset(bytes.NewReader(src)); err != nil {
return nil, err
}
out := new(bytes.Buffer)
if _, err := io.Copy(out, ungz); err != nil {
return nil, err
}
return out.Bytes(), nil
return append([]byte(nil), out.Bytes()...), nil
case codecSnappy:
if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) {
return xerialDecode(src)
}
return s2.Decode(nil, src)
return s2.Decode(out.Bytes(), src)
case codecLZ4:
unlz4 := d.unlz4Pool.Get().(*lz4.Reader)
defer d.unlz4Pool.Put(unlz4)
unlz4.Reset(bytes.NewReader(src))
out := new(bytes.Buffer)
if _, err := io.Copy(out, unlz4); err != nil {
return nil, err
}
return out.Bytes(), nil
return append([]byte(nil), out.Bytes()...), nil
case codecZstd:
unzstd := d.unzstdPool.Get().(*zstdDecoder)
defer d.unzstdPool.Put(unzstd)
return unzstd.inner.DecodeAll(src, nil)
return unzstd.inner.DecodeAll(src, out.Bytes())
default:
return nil, errors.New("unknown compression codec")
}
Expand Down

0 comments on commit 5fcbd7f

Please sign in to comment.