Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-890 (part 1) #600

Merged
merged 3 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/klauspost/compress v1.16.7
github.com/pierrec/lz4/v4 v4.1.18
github.com/twmb/franz-go/pkg/kmsg v1.6.1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
golang.org/x/crypto v0.11.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGC
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM=
github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
219 changes: 210 additions & 9 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
*kmsg.ListGroupsRequest, // key 16
*kmsg.DeleteRecordsRequest, // key 21
*kmsg.OffsetForLeaderEpochRequest, // key 23
*kmsg.AddPartitionsToTxnRequest, // key 24
*kmsg.WriteTxnMarkersRequest, // key 27
*kmsg.DescribeConfigsRequest, // key 32
*kmsg.AlterConfigsRequest, // key 33
*kmsg.AlterReplicaLogDirsRequest, // key 34
Expand Down Expand Up @@ -1775,8 +1777,6 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
// names, we delete no coordinator.
coordinator, resp, err := cl.handleReqWithCoordinator(ctx, func() (*broker, error) { return cl.broker(), nil }, coordinatorTypeTxn, "", req)
return shard(coordinator, req, resp, err)
case *kmsg.AddPartitionsToTxnRequest:
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req)
case *kmsg.AddOffsetsToTxnRequest:
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeTxn, t.TransactionalID, req)
case *kmsg.EndTxnRequest:
Expand Down Expand Up @@ -1840,10 +1840,6 @@ func (cl *Client) handleReqWithCoordinator(
// TXN
case *kmsg.InitProducerIDResponse:
code = t.ErrorCode
case *kmsg.AddPartitionsToTxnResponse:
if len(t.Topics) > 0 && len(t.Topics[0].Partitions) > 0 {
code = t.Topics[0].Partitions[0].ErrorCode
}
case *kmsg.AddOffsetsToTxnResponse:
code = t.ErrorCode
case *kmsg.EndTxnResponse:
Expand Down Expand Up @@ -2080,6 +2076,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
sharder = &deleteRecordsSharder{cl}
case *kmsg.OffsetForLeaderEpochRequest:
sharder = &offsetForLeaderEpochSharder{cl}
case *kmsg.AddPartitionsToTxnRequest:
sharder = &addPartitionsToTxnSharder{cl}
case *kmsg.WriteTxnMarkersRequest:
sharder = &writeTxnMarkersSharder{cl}
case *kmsg.DescribeConfigsRequest:
Expand Down Expand Up @@ -2767,9 +2765,16 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
broker: id,
})
}
} else if len(req.Groups) == 1 {
single := offsetFetchGroupToReq(req.RequireStable, req.Groups[0])
single.Groups = req.Groups
issues = append(issues, issueShard{
req: single,
broker: id,
})
} else {
issues = append(issues, issueShard{
req: &pinReq{Request: req, pinMin: true, min: 8},
req: &pinReq{Request: req, pinMin: len(req.Groups) > 1, min: 8},
broker: id,
})
}
Expand All @@ -2791,7 +2796,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last
}

func (cl *offsetFetchSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
req := kreq.(*kmsg.OffsetFetchRequest) // we always issue pinned requests
req := kreq.(*kmsg.OffsetFetchRequest)
resp := kresp.(*kmsg.OffsetFetchResponse)

switch len(resp.Groups) {
Expand Down Expand Up @@ -2876,9 +2881,16 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
for key := range uniq {
req.CoordinatorKeys = append(req.CoordinatorKeys, key)
}
if len(req.CoordinatorKeys) == 1 {
req.CoordinatorKey = req.CoordinatorKeys[0]
}

splitReq := errors.Is(lastErr, errBrokerTooOld)
if !splitReq {
// With only one key, we do not need to split nor pin this.
if len(req.CoordinatorKeys) <= 1 {
return []issueShard{{req: req, any: true}}, false, nil
}
return []issueShard{{
req: &pinReq{Request: req, pinMin: true, min: 4},
any: true,
Expand All @@ -2899,7 +2911,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
}

func (*findCoordinatorSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
req := kreq.(*kmsg.FindCoordinatorRequest) // we always issue pinned requests
req := kreq.(*kmsg.FindCoordinatorRequest)
resp := kresp.(*kmsg.FindCoordinatorResponse)

switch len(resp.Coordinators) {
Expand Down Expand Up @@ -3293,6 +3305,195 @@ func (*offsetForLeaderEpochSharder) merge(sresps []ResponseShard) (kmsg.Response
return merged, firstErr
}

// handle sharding AddPartitionsToTXn, where v4+ switched to batch requests
type addPartitionsToTxnSharder struct{ *Client }

func addPartitionsReqToTxn(req *kmsg.AddPartitionsToTxnRequest) {
t := kmsg.NewAddPartitionsToTxnRequestTransaction()
t.TransactionalID = req.TransactionalID
t.ProducerID = req.ProducerID
t.ProducerEpoch = req.ProducerEpoch
for i := range req.Topics {
rt := &req.Topics[i]
tt := kmsg.NewAddPartitionsToTxnRequestTransactionTopic()
tt.Topic = rt.Topic
tt.Partitions = rt.Partitions
t.Topics = append(t.Topics, tt)
}
req.Transactions = append(req.Transactions, t)
}

func addPartitionsTxnToReq(req *kmsg.AddPartitionsToTxnRequest) {
if len(req.Transactions) != 1 {
return
}
t0 := &req.Transactions[0]
req.TransactionalID = t0.TransactionalID
req.ProducerID = t0.ProducerID
req.ProducerEpoch = t0.ProducerEpoch
for _, tt := range t0.Topics {
rt := kmsg.NewAddPartitionsToTxnRequestTopic()
rt.Topic = tt.Topic
rt.Partitions = tt.Partitions
req.Topics = append(req.Topics, rt)
}
}

func addPartitionsTxnToResp(resp *kmsg.AddPartitionsToTxnResponse) {
if len(resp.Transactions) == 0 {
return
}
t0 := &resp.Transactions[0]
for _, tt := range t0.Topics {
rt := kmsg.NewAddPartitionsToTxnResponseTopic()
rt.Topic = tt.Topic
for _, tp := range tt.Partitions {
rp := kmsg.NewAddPartitionsToTxnResponseTopicPartition()
rp.Partition = tp.Partition
rp.ErrorCode = tp.ErrorCode
rt.Partitions = append(rt.Partitions, rp)
}
resp.Topics = append(resp.Topics, rt)
}
}

func (cl *addPartitionsToTxnSharder) shard(ctx context.Context, kreq kmsg.Request, _ error) ([]issueShard, bool, error) {
req := kreq.(*kmsg.AddPartitionsToTxnRequest)

if len(req.Transactions) == 0 {
addPartitionsReqToTxn(req)
}
txnIDs := make([]string, 0, len(req.Transactions))
for i := range req.Transactions {
txnIDs = append(txnIDs, req.Transactions[i].TransactionalID)
}
coordinators := cl.loadCoordinators(ctx, coordinatorTypeTxn, txnIDs...)

type unkerr struct {
err error
txn kmsg.AddPartitionsToTxnRequestTransaction
}
var (
brokerReqs = make(map[int32]*kmsg.AddPartitionsToTxnRequest)
kerrs = make(map[*kerr.Error][]kmsg.AddPartitionsToTxnRequestTransaction)
unkerrs []unkerr
)

newReq := func(txns ...kmsg.AddPartitionsToTxnRequestTransaction) *kmsg.AddPartitionsToTxnRequest {
req := kmsg.NewPtrAddPartitionsToTxnRequest()
req.Transactions = txns
addPartitionsTxnToReq(req)
return req
}

for _, txn := range req.Transactions {
berr := coordinators[txn.TransactionalID]
var ke *kerr.Error
switch {
case berr.err == nil:
brokerReq := brokerReqs[berr.b.meta.NodeID]
if brokerReq == nil {
brokerReq = newReq(txn)
brokerReqs[berr.b.meta.NodeID] = brokerReq
} else {
brokerReq.Transactions = append(brokerReq.Transactions, txn)
}
case errors.As(berr.err, &ke):
kerrs[ke] = append(kerrs[ke], txn)
default:
unkerrs = append(unkerrs, unkerr{berr.err, txn})
}
}

var issues []issueShard
for id, req := range brokerReqs {
issues = append(issues, issueShard{
req: req,
broker: id,
})
}
for _, unkerr := range unkerrs {
issues = append(issues, issueShard{
req: newReq(unkerr.txn),
err: unkerr.err,
})
}
for kerr, txns := range kerrs {
issues = append(issues, issueShard{
req: newReq(txns...),
err: kerr,
})
}

return issues, true, nil // reshardable to load correct coordinators
}

func (cl *addPartitionsToTxnSharder) onResp(kreq kmsg.Request, kresp kmsg.Response) error {
req := kreq.(*kmsg.AddPartitionsToTxnRequest)
resp := kresp.(*kmsg.AddPartitionsToTxnResponse)

// We default to the top level error, which is used in v4+. For v3
// (case 0), we use the per-partition error, which is the same for
// every partition on not_coordinator errors.
code := resp.ErrorCode
if code == 0 && len(resp.Transactions) == 0 {
// Convert v3 and prior to v4+
resptxn := kmsg.NewAddPartitionsToTxnResponseTransaction()
resptxn.TransactionalID = req.TransactionalID
for _, rt := range resp.Topics {
respt := kmsg.NewAddPartitionsToTxnResponseTransactionTopic()
respt.Topic = rt.Topic
for _, rp := range rt.Partitions {
respp := kmsg.NewAddPartitionsToTxnResponseTransactionTopicPartition()
respp.Partition = rp.Partition
respp.ErrorCode = rp.ErrorCode
code = rp.ErrorCode // v3 and prior has per-partition errors, not top level
respt.Partitions = append(respt.Partitions, respp)
}
resptxn.Topics = append(resptxn.Topics, respt)
}
resp.Transactions = append(resp.Transactions, resptxn)
} else {
// Convert v4 to v3 and prior: either we have a top level error
// code or we have at least one transaction.
//
// If the code is non-zero, we convert it to per-partition error
// codes; v3 does not have a top level err.
addPartitionsTxnToResp(resp)
if code != 0 {
for _, reqt := range req.Topics {
respt := kmsg.NewAddPartitionsToTxnResponseTopic()
respt.Topic = reqt.Topic
for _, reqp := range reqt.Partitions {
respp := kmsg.NewAddPartitionsToTxnResponseTopicPartition()
respp.Partition = reqp
respp.ErrorCode = resp.ErrorCode
respt.Partitions = append(respt.Partitions, respp)
}
resp.Topics = append(resp.Topics, respt)
}
}
}
if err := kerr.ErrorForCode(code); cl.maybeDeleteStaleCoordinator(req.TransactionalID, coordinatorTypeTxn, err) {
return err
}
return nil
}

func (*addPartitionsToTxnSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := kmsg.NewPtrAddPartitionsToTxnResponse()

firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) {
resp := kresp.(*kmsg.AddPartitionsToTxnResponse)
merged.Version = resp.Version
merged.ThrottleMillis = resp.ThrottleMillis
merged.ErrorCode = resp.ErrorCode
merged.Transactions = append(merged.Transactions, resp.Transactions...)
})
addPartitionsTxnToResp(merged)
return merged, firstErr
}

// handle sharding WriteTxnMarkersRequest
type writeTxnMarkersSharder struct{ *Client }

Expand Down
14 changes: 13 additions & 1 deletion pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
{max330, "v3.3"},
{max340, "v3.4"},
{max350, "v3.5"},
{max360, "v3.6"},
} {
for k, v := range comparison.cmp.filter(cfg.listener) {
if v == -1 {
Expand Down Expand Up @@ -445,6 +446,10 @@ func V3_3_0() *Versions { return zkBrokerOf(max330) }
func V3_4_0() *Versions { return zkBrokerOf(max340) }
func V3_5_0() *Versions { return zkBrokerOf(max350) }

/* TODO wait for franz-go v1.16
func V3_6_0() *Versions { return zkBrokerOf(max360) }
*/

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
}
Expand Down Expand Up @@ -1051,8 +1056,15 @@ var max350 = nextMax(max340, func(v listenerKeys) listenerKeys {
return v
})

var max360 = nextMax(max350, func(v listenerKeys) listenerKeys {
// KAFKA-14402 29a1a16668d76a1cc04ec9e39ea13026f2dce1de KIP-890
// Later commit swapped to stable
v[24].inc() // 4 add partitions to txn
return v
})

var (
maxStable = max350
maxStable = max360
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
return v
})
Expand Down