Skip to content

Commit

Permalink
kgo: turn DecompressBufferPool into an optional field (#6) (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman authored Oct 8, 2024
1 parent f8f76a4 commit b76e947
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 16 deletions.
32 changes: 21 additions & 11 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"

"github.com/twmb/franz-go/pkg/kgo/internal/pool"
"github.com/twmb/franz-go/pkg/kgo/pool"
)

var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }}
Expand Down Expand Up @@ -274,17 +273,28 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
if compCodec == codecNone {
return src, nil
}
var (
out *bytes.Buffer
buf []byte
err error
)

out, buf, err := d.getDecodedBuffer(src, compCodec, pool)
if err != nil {
return nil, err
}
defer func() {
if compCodec == codecSnappy {
return
if pool != nil {
out, buf, err = d.getDecodedBuffer(src, compCodec, pool)
if err != nil {
return nil, err
}
pool.Put(buf)
}()
defer func() {
if compCodec == codecSnappy {
return
}
pool.Put(buf)
}()
} else {
out = byteBuffers.Get().(*bytes.Buffer)
out.Reset()
defer byteBuffers.Put(out)
}

switch compCodec {
case codecGzip:
Expand Down
3 changes: 1 addition & 2 deletions pkg/kgo/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
"testing"

"github.com/pierrec/lz4/v4"

"github.com/twmb/franz-go/pkg/kgo/internal/pool"
"github.com/twmb/franz-go/pkg/kgo/pool"
)

// Regression test for #778.
Expand Down
3 changes: 1 addition & 2 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo/pool"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
"github.com/twmb/franz-go/pkg/sasl"

"github.com/twmb/franz-go/pkg/kgo/internal/pool"
)

// Opt is an option to configure a client.
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo/internal/pool"
"github.com/twmb/franz-go/pkg/kgo/pool"
"github.com/twmb/franz-go/pkg/kmsg"
)

Expand Down

0 comments on commit b76e947

Please sign in to comment.