From 32735850a978919a3e5a7fbbb0082a209e32e7cd Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 20 Oct 2023 19:30:11 -0600 Subject: [PATCH 1/3] go.{mod,sum}: pin to kmsg v1.7.0 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d945fef6..399672da 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( github.com/klauspost/compress v1.16.7 github.com/pierrec/lz4/v4 v4.1.18 - github.com/twmb/franz-go/pkg/kmsg v1.6.1 + github.com/twmb/franz-go/pkg/kmsg v1.7.0 golang.org/x/crypto v0.11.0 ) diff --git a/go.sum b/go.sum index 02d6ebc1..b9431395 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGC github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM= -github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= +github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= From fe5a660fa633206bf5773f697595d60c077ed4a2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 19 Oct 2023 16:54:17 -0600 Subject: [PATCH 2/3] kgo: add sharding for AddPartitionsToTxn for KIP-890 This is more of a forward looking commit, in that kadm will eventually introduce support for this. We now basically handle v4 properly, even though KIP-890 dictates that v4 isn't meant to be sent by clients, it is indeed still necessary and not sending it results in INVALID_TXN_STATE errors. Also properly adds the WriteTxnMarkers sharder to the switch, though nothing really should send that request so that doesn't really fix any bugs. Kafka 3.6 has an NPE handling produce requests frequently, see KAFKA-15653, so tests may fail against 3.6 occasionally. --- pkg/kgo/client.go | 219 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 210 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index d1368f7e..27e27df1 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1325,6 +1325,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo *kmsg.ListGroupsRequest, // key 16 *kmsg.DeleteRecordsRequest, // key 21 *kmsg.OffsetForLeaderEpochRequest, // key 23 + *kmsg.AddPartitionsToTxnRequest, // key 24 + *kmsg.WriteTxnMarkersRequest, // key 27 *kmsg.DescribeConfigsRequest, // key 32 *kmsg.AlterConfigsRequest, // key 33 *kmsg.AlterReplicaLogDirsRequest, // key 34 @@ -1775,8 +1777,6 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re // names, we delete no coordinator. coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { return cl.broker(), nil }, coordinatorTypeTxn, "", req) return shard(coordinator, req, resp, err) - case *kmsg.AddPartitionsToTxnRequest: - return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req) case *kmsg.AddOffsetsToTxnRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req) case *kmsg.EndTxnRequest: @@ -1840,10 +1840,6 @@ func (cl *Client) handleReqWithCoordinator( // TXN case *kmsg.InitProducerIDResponse: code = t.ErrorCode - case *kmsg.AddPartitionsToTxnResponse: - if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 { - code = t.Topics[0].Partitions[0].ErrorCode - } case *kmsg.AddOffsetsToTxnResponse: code = t.ErrorCode case *kmsg.EndTxnResponse: @@ -2080,6 +2076,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res sharder = &deleteRecordsSharder{cl} case *kmsg.OffsetForLeaderEpochRequest: sharder = &offsetForLeaderEpochSharder{cl} + case *kmsg.AddPartitionsToTxnRequest: + sharder = &addPartitionsToTxnSharder{cl} case *kmsg.WriteTxnMarkersRequest: sharder = &writeTxnMarkersSharder{cl} case *kmsg.DescribeConfigsRequest: @@ -2767,9 +2765,16 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last broker: id, }) } + } else if len(req.Groups) == 1 { + single := offsetFetchGroupToReq(req.RequireStable, req.Groups[0]) + single.Groups = req.Groups + issues = append(issues, issueShard{ + req: single, + broker: id, + }) } else { issues = append(issues, issueShard{ - req: &pinReq{Request: req, pinMin: true, min: 8}, + req: &pinReq{Request: req, pinMin: len(req.Groups) > 1, min: 8}, broker: id, }) } @@ -2791,7 +2796,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last } func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { - req := kreq.(*kmsg.OffsetFetchRequest) // we always issue pinned requests + req := kreq.(*kmsg.OffsetFetchRequest) resp := kresp.(*kmsg.OffsetFetchResponse) switch len(resp.Groups) { @@ -2876,9 +2881,16 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE for key := range uniq { req.CoordinatorKeys = append(req.CoordinatorKeys, key) } + if len(req.CoordinatorKeys) == 1 { + req.CoordinatorKey = req.CoordinatorKeys[0] + } splitReq := errors.Is(lastErr, errBrokerTooOld) if !splitReq { + // With only one key, we do not need to split nor pin this. + if len(req.CoordinatorKeys) <= 1 { + return []issueShard{{req: req, any: true}}, false, nil + } return []issueShard{{ req: &pinReq{Request: req, pinMin: true, min: 4}, any: true, @@ -2899,7 +2911,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE } func (*findCoordinatorSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { - req := kreq.(*kmsg.FindCoordinatorRequest) // we always issue pinned requests + req := kreq.(*kmsg.FindCoordinatorRequest) resp := kresp.(*kmsg.FindCoordinatorResponse) switch len(resp.Coordinators) { @@ -3293,6 +3305,195 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response return merged, firstErr } +// handle sharding AddPartitionsToTXn, where v4+ switched to batch requests +type addPartitionsToTxnSharder struct{ *Client } + +func addPartitionsReqToTxn(req *kmsg.AddPartitionsToTxnRequest) { + t := kmsg.NewAddPartitionsToTxnRequestTransaction() + t.TransactionalID = req.TransactionalID + t.ProducerID = req.ProducerID + t.ProducerEpoch = req.ProducerEpoch + for i := range req.Topics { + rt := &req.Topics[i] + tt := kmsg.NewAddPartitionsToTxnRequestTransactionTopic() + tt.Topic = rt.Topic + tt.Partitions = rt.Partitions + t.Topics = append(t.Topics, tt) + } + req.Transactions = append(req.Transactions, t) +} + +func addPartitionsTxnToReq(req *kmsg.AddPartitionsToTxnRequest) { + if len(req.Transactions) != 1 { + return + } + t0 := &req.Transactions[0] + req.TransactionalID = t0.TransactionalID + req.ProducerID = t0.ProducerID + req.ProducerEpoch = t0.ProducerEpoch + for _, tt := range t0.Topics { + rt := kmsg.NewAddPartitionsToTxnRequestTopic() + rt.Topic = tt.Topic + rt.Partitions = tt.Partitions + req.Topics = append(req.Topics, rt) + } +} + +func addPartitionsTxnToResp(resp *kmsg.AddPartitionsToTxnResponse) { + if len(resp.Transactions) == 0 { + return + } + t0 := &resp.Transactions[0] + for _, tt := range t0.Topics { + rt := kmsg.NewAddPartitionsToTxnResponseTopic() + rt.Topic = tt.Topic + for _, tp := range tt.Partitions { + rp := kmsg.NewAddPartitionsToTxnResponseTopicPartition() + rp.Partition = tp.Partition + rp.ErrorCode = tp.ErrorCode + rt.Partitions = append(rt.Partitions, rp) + } + resp.Topics = append(resp.Topics, rt) + } +} + +func (cl *addPartitionsToTxnSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) { + req := kreq.(*kmsg.AddPartitionsToTxnRequest) + + if len(req.Transactions) == 0 { + addPartitionsReqToTxn(req) + } + txnIDs := make([]string, 0, len(req.Transactions)) + for i := range req.Transactions { + txnIDs = append(txnIDs, req.Transactions[i].TransactionalID) + } + coordinators := cl.loadCoordinators(ctx, coordinatorTypeTxn, txnIDs...) + + type unkerr struct { + err error + txn kmsg.AddPartitionsToTxnRequestTransaction + } + var ( + brokerReqs = make(map[int32]*kmsg.AddPartitionsToTxnRequest) + kerrs = make(map[*kerr.Error][]kmsg.AddPartitionsToTxnRequestTransaction) + unkerrs []unkerr + ) + + newReq := func(txns ...kmsg.AddPartitionsToTxnRequestTransaction) *kmsg.AddPartitionsToTxnRequest { + req := kmsg.NewPtrAddPartitionsToTxnRequest() + req.Transactions = txns + addPartitionsTxnToReq(req) + return req + } + + for _, txn := range req.Transactions { + berr := coordinators[txn.TransactionalID] + var ke *kerr.Error + switch { + case berr.err == nil: + brokerReq := brokerReqs[berr.b.meta.NodeID] + if brokerReq == nil { + brokerReq = newReq(txn) + brokerReqs[berr.b.meta.NodeID] = brokerReq + } else { + brokerReq.Transactions = append(brokerReq.Transactions, txn) + } + case errors.As(berr.err, &ke): + kerrs[ke] = append(kerrs[ke], txn) + default: + unkerrs = append(unkerrs, unkerr{berr.err, txn}) + } + } + + var issues []issueShard + for id, req := range brokerReqs { + issues = append(issues, issueShard{ + req: req, + broker: id, + }) + } + for _, unkerr := range unkerrs { + issues = append(issues, issueShard{ + req: newReq(unkerr.txn), + err: unkerr.err, + }) + } + for kerr, txns := range kerrs { + issues = append(issues, issueShard{ + req: newReq(txns...), + err: kerr, + }) + } + + return issues, true, nil // reshardable to load correct coordinators +} + +func (cl *addPartitionsToTxnSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error { + req := kreq.(*kmsg.AddPartitionsToTxnRequest) + resp := kresp.(*kmsg.AddPartitionsToTxnResponse) + + // We default to the top level error, which is used in v4+. For v3 + // (case 0), we use the per-partition error, which is the same for + // every partition on not_coordinator errors. + code := resp.ErrorCode + if code == 0 && len(resp.Transactions) == 0 { + // Convert v3 and prior to v4+ + resptxn := kmsg.NewAddPartitionsToTxnResponseTransaction() + resptxn.TransactionalID = req.TransactionalID + for _, rt := range resp.Topics { + respt := kmsg.NewAddPartitionsToTxnResponseTransactionTopic() + respt.Topic = rt.Topic + for _, rp := range rt.Partitions { + respp := kmsg.NewAddPartitionsToTxnResponseTransactionTopicPartition() + respp.Partition = rp.Partition + respp.ErrorCode = rp.ErrorCode + code = rp.ErrorCode // v3 and prior has per-partition errors, not top level + respt.Partitions = append(respt.Partitions, respp) + } + resptxn.Topics = append(resptxn.Topics, respt) + } + resp.Transactions = append(resp.Transactions, resptxn) + } else { + // Convert v4 to v3 and prior: either we have a top level error + // code or we have at least one transaction. + // + // If the code is non-zero, we convert it to per-partition error + // codes; v3 does not have a top level err. + addPartitionsTxnToResp(resp) + if code != 0 { + for _, reqt := range req.Topics { + respt := kmsg.NewAddPartitionsToTxnResponseTopic() + respt.Topic = reqt.Topic + for _, reqp := range reqt.Partitions { + respp := kmsg.NewAddPartitionsToTxnResponseTopicPartition() + respp.Partition = reqp + respp.ErrorCode = resp.ErrorCode + respt.Partitions = append(respt.Partitions, respp) + } + resp.Topics = append(resp.Topics, respt) + } + } + } + if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(req.TransactionalID, coordinatorTypeTxn, err) { + return err + } + return nil +} + +func (*addPartitionsToTxnSharder) merge(sresps []ResponseShard) (kmsg.Response, error) { + merged := kmsg.NewPtrAddPartitionsToTxnResponse() + + firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) { + resp := kresp.(*kmsg.AddPartitionsToTxnResponse) + merged.Version = resp.Version + merged.ThrottleMillis = resp.ThrottleMillis + merged.ErrorCode = resp.ErrorCode + merged.Transactions = append(merged.Transactions, resp.Transactions...) + }) + addPartitionsTxnToResp(merged) + return merged, firstErr +} + // handle sharding WriteTxnMarkersRequest type writeTxnMarkersSharder struct{ *Client } From 2a3b6bd0d045f52b9dc802176a8d6448508edeeb Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 19 Oct 2023 17:05:55 -0600 Subject: [PATCH 3/3] pkg/kversion: detect 3.6 This does not yet add V3_6_0 since that will require a minor. --- pkg/kversion/kversion.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index facad4e2..f6d27c97 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -304,6 +304,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { {max330, "v3.3"}, {max340, "v3.4"}, {max350, "v3.5"}, + {max360, "v3.6"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { if v == -1 { @@ -445,6 +446,10 @@ func V3_3_0() *Versions { return zkBrokerOf(max330) } func V3_4_0() *Versions { return zkBrokerOf(max340) } func V3_5_0() *Versions { return zkBrokerOf(max350) } +/* TODO wait for franz-go v1.16 +func V3_6_0() *Versions { return zkBrokerOf(max360) } +*/ + func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} } @@ -1051,8 +1056,15 @@ var max350 = nextMax(max340, func(v listenerKeys) listenerKeys { return v }) +var max360 = nextMax(max350, func(v listenerKeys) listenerKeys { + // KAFKA-14402 29a1a16668d76a1cc04ec9e39ea13026f2dce1de KIP-890 + // Later commit swapped to stable + v[24].inc() // 4 add partitions to txn + return v +}) + var ( - maxStable = max350 + maxStable = max360 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { return v })