Skip to content

Commit 46a6744

Browse files
committed
Added benchmark for using kgo.FetchMaxPartitionBytes
1 parent 351e7fa commit 46a6744

File tree

7 files changed

+508
-0
lines changed

7 files changed

+508
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Benchmark for using kgo.FetchMaxPartitionBytes
2+
3+
Running the benchmark:
4+
1. Run: ```go test -bench . -benchmem -count 15 -cpu=1 > testrun_normal.txt```
5+
2. Uncomment ```//kgo.FetchMaxPartitionBytes(10*1024*1024),``` on line 53 in ./client_benchmark_test.go
6+
3. Run ```go test -bench . -benchmem -count 15 -cpu=1 > testrun_with_FetchMaxPartitionBytes10MB.txt```
7+
4. Run ```go install golang.org/x/perf/cmd/benchstat@latest```
8+
5. Run ``` benchstat testrun_normal.txt testrun_with_FetchMaxPartitionBytes10MB.txt > benchstat_results.txt```
9+
10+
In the results you can see that there is more than 300% in the memory allocation.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
goos: darwin
2+
goarch: arm64
3+
pkg: github.com/utilitywarehouse/franz-go/examples/bench/bench-fetch-max-partition-bytes
4+
│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
5+
│ sec/op │ sec/op vs base │
6+
PollRecords 3.396 ± 1% 3.698 ± 1% +8.89% (p=0.000 n=15)
7+
8+
│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
9+
│ B/op │ B/op vs base │
10+
PollRecords 126.6Mi ± 0% 625.3Mi ± 0% +393.82% (p=0.000 n=15)
11+
12+
│ testrun_normal.txt │ testrun_with_FetchMaxPartitionBytes10MB.txt │
13+
│ allocs/op │ allocs/op vs base │
14+
PollRecords 13.75k ± 0% 61.91k ± 0% +350.37% (p=0.000 n=15)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package benchmark_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
"github.com/testcontainers/testcontainers-go/modules/redpanda"
11+
"github.com/twmb/franz-go/pkg/kadm"
12+
"github.com/twmb/franz-go/pkg/kgo"
13+
14+
"crypto/rand"
15+
"github.com/google/uuid"
16+
"github.com/testcontainers/testcontainers-go"
17+
"io"
18+
"log"
19+
)
20+
21+
const (
22+
totalEntries = 500000
23+
)
24+
25+
func BenchmarkPollRecords(b *testing.B) {
26+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
27+
b.Cleanup(cancel)
28+
29+
testcontainers.Logger = log.New(io.Discard, "", log.LstdFlags)
30+
31+
ctnr, err := redpanda.RunContainer(ctx, redpanda.WithAutoCreateTopics())
32+
require.NoError(b, err)
33+
34+
broker, err := ctnr.KafkaSeedBroker(ctx)
35+
require.NoError(b, err)
36+
37+
b.Cleanup(func() {
38+
if err := ctnr.Terminate(context.Background()); err != nil {
39+
b.Logf("failed terminating container: %v", err)
40+
}
41+
})
42+
43+
topicName := newRandomName("test-topic")
44+
require.NoError(b, populateTopic(ctx, b, broker, topicName))
45+
46+
b.ResetTimer()
47+
consumer, err := kgo.NewClient(
48+
kgo.SeedBrokers(broker),
49+
kgo.ConsumerGroup(newRandomName("test-consumer")),
50+
kgo.ConsumeTopics(topicName),
51+
kgo.BlockRebalanceOnPoll(),
52+
// use 10MB
53+
//kgo.FetchMaxPartitionBytes(10*1024*1024),
54+
)
55+
56+
require.NoError(b, err)
57+
defer consumer.CloseAllowingRebalance()
58+
59+
consumedTotal := 0
60+
61+
for i := 0; i < b.N; i++ {
62+
fetches := consumer.PollRecords(ctx, 1000)
63+
require.NoError(b, fetches.Err0())
64+
consumedTotal += len(fetches.Records())
65+
if consumedTotal >= totalEntries {
66+
return
67+
}
68+
}
69+
}
70+
71+
func populateTopic(ctx context.Context, t *testing.B, broker string, topicName string) error {
72+
t.Log("Start topic population")
73+
producer, err := kgo.NewClient(
74+
kgo.SeedBrokers(broker),
75+
kgo.DefaultProduceTopic(topicName),
76+
)
77+
require.NoError(t, err)
78+
defer producer.Close()
79+
80+
adm := kadm.NewClient(producer)
81+
partitions := 10
82+
tr, err := adm.CreateTopic(ctx, int32(partitions), 1, nil, topicName)
83+
require.NoError(t, tr.Err)
84+
require.NoError(t, err)
85+
86+
data := make([]byte, 10240) // 10KB per message
87+
_, err = rand.Read(data)
88+
require.NoError(t, err)
89+
90+
batchLen := 5000
91+
for it := 0; it < totalEntries/batchLen; it++ {
92+
recs := make([]*kgo.Record, batchLen)
93+
for i := 0; i < batchLen; i++ {
94+
recs[i] = &kgo.Record{
95+
Value: data,
96+
Key: []byte(fmt.Sprintf("key-%d", i%partitions)),
97+
}
98+
}
99+
require.NoError(t, producer.ProduceSync(ctx, recs...).FirstErr())
100+
}
101+
return err
102+
}
103+
104+
func newRandomName(baseName string) string {
105+
return baseName + "-" + uuid.NewString()
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
module github.com/utilitywarehouse/franz-go/examples/bench/bench-fetch-max-partition-bytes
2+
3+
go 1.22.1
4+
5+
require (
6+
github.com/google/uuid v1.6.0
7+
github.com/stretchr/testify v1.9.0
8+
github.com/testcontainers/testcontainers-go v0.29.1
9+
github.com/testcontainers/testcontainers-go/modules/redpanda v0.29.1
10+
github.com/twmb/franz-go v1.16.1
11+
github.com/twmb/franz-go/pkg/kadm v1.11.0
12+
)
13+
14+
require (
15+
dario.cat/mergo v1.0.0 // indirect
16+
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
17+
github.com/Microsoft/go-winio v0.6.1 // indirect
18+
github.com/Microsoft/hcsshim v0.11.4 // indirect
19+
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
20+
github.com/containerd/containerd v1.7.12 // indirect
21+
github.com/containerd/log v0.1.0 // indirect
22+
github.com/cpuguy83/dockercfg v0.3.1 // indirect
23+
github.com/davecgh/go-spew v1.1.1 // indirect
24+
github.com/distribution/reference v0.5.0 // indirect
25+
github.com/docker/docker v25.0.3+incompatible // indirect
26+
github.com/docker/go-connections v0.5.0 // indirect
27+
github.com/docker/go-units v0.5.0 // indirect
28+
github.com/felixge/httpsnoop v1.0.3 // indirect
29+
github.com/go-logr/logr v1.2.4 // indirect
30+
github.com/go-logr/stdr v1.2.2 // indirect
31+
github.com/go-ole/go-ole v1.2.6 // indirect
32+
github.com/gogo/protobuf v1.3.2 // indirect
33+
github.com/golang/protobuf v1.5.3 // indirect
34+
github.com/klauspost/compress v1.17.4 // indirect
35+
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
36+
github.com/magiconair/properties v1.8.7 // indirect
37+
github.com/moby/patternmatcher v0.6.0 // indirect
38+
github.com/moby/sys/sequential v0.5.0 // indirect
39+
github.com/moby/sys/user v0.1.0 // indirect
40+
github.com/moby/term v0.5.0 // indirect
41+
github.com/morikuni/aec v1.0.0 // indirect
42+
github.com/opencontainers/go-digest v1.0.0 // indirect
43+
github.com/opencontainers/image-spec v1.1.0 // indirect
44+
github.com/pierrec/lz4/v4 v4.1.19 // indirect
45+
github.com/pkg/errors v0.9.1 // indirect
46+
github.com/pmezard/go-difflib v1.0.0 // indirect
47+
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
48+
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
49+
github.com/shoenig/go-m1cpu v0.1.6 // indirect
50+
github.com/sirupsen/logrus v1.9.3 // indirect
51+
github.com/tklauser/go-sysconf v0.3.12 // indirect
52+
github.com/tklauser/numcpus v0.6.1 // indirect
53+
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
54+
github.com/yusufpapurcu/wmi v1.2.3 // indirect
55+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
56+
go.opentelemetry.io/otel v1.19.0 // indirect
57+
go.opentelemetry.io/otel/metric v1.19.0 // indirect
58+
go.opentelemetry.io/otel/trace v1.19.0 // indirect
59+
golang.org/x/crypto v0.17.0 // indirect
60+
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
61+
golang.org/x/mod v0.16.0 // indirect
62+
golang.org/x/sys v0.16.0 // indirect
63+
golang.org/x/tools v0.13.0 // indirect
64+
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
65+
google.golang.org/grpc v1.58.3 // indirect
66+
google.golang.org/protobuf v1.31.0 // indirect
67+
gopkg.in/yaml.v3 v3.0.1 // indirect
68+
)

0 commit comments

Comments
 (0)