From e5d53478932546d676bb2fd53e54b920f67dfb6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Wed, 9 Oct 2024 09:41:34 +0200 Subject: [PATCH] kgo: do not reuse decompression buffer for uncompressed batches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ref: https://github.com/twmb/franz-go/issues/823#issuecomment-2400842614 Signed-off-by: Miguel Ángel Ortuño --- pkg/kgo/record_and_fetch.go | 6 +++++- pkg/kgo/source.go | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index dbfb9451..c82d7539 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -178,9 +178,13 @@ type Record struct { // Once this method has been called, any reference to the passed record should be considered invalid by the caller, // as it may be reused as a result of future calls to the PollFetches/PollRecords method. func (r *Record) Reuse() { - if r.recordsPool != nil { + if r.rcRawRecordsBuffer != nil { r.rcRawRecordsBuffer.release() + } + if r.rcBatchBuffer != nil { r.rcBatchBuffer.release() + } + if r.recordsPool != nil { r.recordsPool.put(r) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0f2d8962..51f87374 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1583,7 +1583,9 @@ func processRecordBatch( } rawRecords := batch.Records - if compression := byte(batch.Attributes & 0x0007); compression != 0 { + + compression := byte(batch.Attributes & 0x0007) + if compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { return 0, 0 // truncated batch @@ -1616,7 +1618,7 @@ func processRecordBatch( rcBatchBuff *rcBuffer[byte] rcRawRecordsBuff *rcBuffer[kmsg.Record] ) - if o.recordPool != nil { + if o.recordPool != nil && codecType(compression) != codecNone { rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) }