From 1955938745ea62843ea562a2a45e23a1f6238631 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 21 Oct 2023 12:29:43 -0600 Subject: [PATCH] kadm: do not reuse ApiVersions in many concurrent requests The client calls SetVersion internally per request, so doing this concurrently leads to races. --- pkg/kadm/acls.go | 2 +- pkg/kadm/misc.go | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/kadm/acls.go b/pkg/kadm/acls.go index 9e12cf00..c213306c 100644 --- a/pkg/kadm/acls.go +++ b/pkg/kadm/acls.go @@ -924,7 +924,7 @@ func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLs ) defer cancel() for i := range descs { - req := descs[i] + req := descs[i] // each req is unique per loop, we are not reusing req, this is safe myIdx := i wg.Add(1) go func() { diff --git a/pkg/kadm/misc.go b/pkg/kadm/misc.go index 160b4344..de4b786c 100644 --- a/pkg/kadm/misc.go +++ b/pkg/kadm/misc.go @@ -244,10 +244,6 @@ func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) { return nil, err } - req := kmsg.NewPtrApiVersionsRequest() - req.ClientSoftwareName = "kadm" - req.ClientSoftwareVersion = softwareVersion() - var mu sync.Mutex var wg sync.WaitGroup vs := make(BrokersApiVersions, len(m.Brokers)) @@ -256,6 +252,9 @@ func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) { wg.Add(1) go func() { defer wg.Done() + req := kmsg.NewPtrApiVersionsRequest() + req.ClientSoftwareName = "kadm" + req.ClientSoftwareVersion = softwareVersion() v := BrokerApiVersions{NodeID: n, keyVersions: make(map[int16]minmax)} v.raw, v.Err = req.RequestWith(ctx, cl.cl.Broker(int(n)))