From b76e947fc69d7536a51358f891f206d62dda8c11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Tue, 8 Oct 2024 16:29:45 +0200 Subject: [PATCH] kgo: turn `DecompressBufferPool` into an optional field (#6) (#7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miguel Ángel Ortuño --- pkg/kgo/compression.go | 32 ++++++++++++------- pkg/kgo/compression_test.go | 3 +- pkg/kgo/config.go | 3 +- pkg/kgo/{internal => }/pool/bucketed_pool.go | 0 .../{internal => }/pool/bucketed_pool_test.go | 0 pkg/kgo/source.go | 2 +- 6 files changed, 24 insertions(+), 16 deletions(-) rename pkg/kgo/{internal => }/pool/bucketed_pool.go (100%) rename pkg/kgo/{internal => }/pool/bucketed_pool_test.go (100%) diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index 1adbe69a..2e75346d 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -12,8 +12,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" - - "github.com/twmb/franz-go/pkg/kgo/internal/pool" + "github.com/twmb/franz-go/pkg/kgo/pool" ) var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }} @@ -274,17 +273,28 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo if compCodec == codecNone { return src, nil } + var ( + out *bytes.Buffer + buf []byte + err error + ) - out, buf, err := d.getDecodedBuffer(src, compCodec, pool) - if err != nil { - return nil, err - } - defer func() { - if compCodec == codecSnappy { - return + if pool != nil { + out, buf, err = d.getDecodedBuffer(src, compCodec, pool) + if err != nil { + return nil, err } - pool.Put(buf) - }() + defer func() { + if compCodec == codecSnappy { + return + } + pool.Put(buf) + }() + } else { + out = byteBuffers.Get().(*bytes.Buffer) + out.Reset() + defer byteBuffers.Put(out) + } switch compCodec { case codecGzip: diff --git a/pkg/kgo/compression_test.go b/pkg/kgo/compression_test.go index 005e3dfd..568c21b4 100644 --- a/pkg/kgo/compression_test.go +++ b/pkg/kgo/compression_test.go @@ -10,8 +10,7 @@ import ( "testing" "github.com/pierrec/lz4/v4" - - "github.com/twmb/franz-go/pkg/kgo/internal/pool" + "github.com/twmb/franz-go/pkg/kgo/pool" ) // Regression test for #778. diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index dabaf03f..20006472 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -13,11 +13,10 @@ import ( "sync" "time" + "github.com/twmb/franz-go/pkg/kgo/pool" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "github.com/twmb/franz-go/pkg/sasl" - - "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) // Opt is an option to configure a client. diff --git a/pkg/kgo/internal/pool/bucketed_pool.go b/pkg/kgo/pool/bucketed_pool.go similarity index 100% rename from pkg/kgo/internal/pool/bucketed_pool.go rename to pkg/kgo/pool/bucketed_pool.go diff --git a/pkg/kgo/internal/pool/bucketed_pool_test.go b/pkg/kgo/pool/bucketed_pool_test.go similarity index 100% rename from pkg/kgo/internal/pool/bucketed_pool_test.go rename to pkg/kgo/pool/bucketed_pool_test.go diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 69fafba7..0f2d8962 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -14,7 +14,7 @@ import ( "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kgo/internal/pool" + "github.com/twmb/franz-go/pkg/kgo/pool" "github.com/twmb/franz-go/pkg/kmsg" )