Skip to content

Commit

Permalink
kgo: do not reuse decompression buffer for uncompressed batches
Browse files Browse the repository at this point in the history
ref: twmb#823 (comment)

Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman committed Oct 9, 2024
1 parent d8a4693 commit 8562b71
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *p
}

func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte {
if pool == nil {
return append([]byte(nil), decoded...)
}
if compCodec == codecSnappy {
// We already know the actual size of the decoded buffer before decompression,
// so there's no need to copy the buffer.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ type Record struct {
// Once this method has been called, any reference to the passed record should be considered invalid by the caller,
// as it may be reused as a result of future calls to the PollFetches/PollRecords method.
func (r *Record) Reuse() {
if r.recordsPool != nil {
if r.rcRawRecordsBuffer != nil {
r.rcRawRecordsBuffer.release()
}
if r.rcBatchBuffer != nil {
r.rcBatchBuffer.release()
}
if r.recordsPool != nil {
r.recordsPool.put(r)
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,9 @@ func processRecordBatch(
}

rawRecords := batch.Records
if compression := byte(batch.Attributes & 0x0007); compression != 0 {

compression := byte(batch.Attributes & 0x0007)
if compression != 0 {
var err error
if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil {
return 0, 0 // truncated batch
Expand Down Expand Up @@ -1616,7 +1618,7 @@ func processRecordBatch(
rcBatchBuff *rcBuffer[byte]
rcRawRecordsBuff *rcBuffer[kmsg.Record]
)
if o.recordPool != nil {
if o.recordPool != nil && codecType(compression) != codecNone {
rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool)
rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool)
}
Expand Down

0 comments on commit 8562b71

Please sign in to comment.