From 547e3b6be4db01387a389b663af9118ac5a38e1d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 25 Mar 2023 15:04:52 -0600 Subject: [PATCH] KIP-890 proto --- generate/definitions/24_add_partitions_to_txn | 44 +- pkg/kmsg/generated.go | 565 +++++++++++++++++- pkg/kversion/kversion.go | 13 +- 3 files changed, 598 insertions(+), 24 deletions(-) diff --git a/generate/definitions/24_add_partitions_to_txn b/generate/definitions/24_add_partitions_to_txn index cdec6913..dc8b757b 100644 --- a/generate/definitions/24_add_partitions_to_txn +++ b/generate/definitions/24_add_partitions_to_txn @@ -2,28 +2,60 @@ // partitions in the request. Before producing any records to a partition in // the transaction, that partition must have been added to the transaction with // this request. -AddPartitionsToTxnRequest => key 24, max version 3, flexible v3+, txn coordinator +// +// Versions 3 and below are exclusively used by clients and versions 4 and +// above are used by brokers. +// +// Version 4 adds VerifyOnly field to check if partitions are already in +// transaction and adds support to batch multiple transactions. +AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. - TransactionalID: string + TransactionalID: string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID // as received from InitProducerID. - ProducerID: int64 + ProducerID: int64 // v0-v3 // ProducerEpoch is the producer epoch of the client for this transactional ID // as received from InitProducerID. - ProducerEpoch: int16 + ProducerEpoch: int16 // v0-v3 // Topics are topics to add as part of the producer side of a transaction. - Topics: [=>] + Topics: [=>] // v0-v3 // Topic is a topic name. Topic: string // Partitions are partitions within a topic to add as part of the producer // side of a transaction. Partitions: [int32] + // The list of transactions to add partitions to, for v4+, for brokers only. + // The fields in this are batch broker requests that duplicate the above fields + // and thus are undocumented (except VerifyOnly, which is new). + Transactions: [=>] // v4+ + TransactionalID: string + ProducerID: int64 + ProducerEpoch: int16 + // VerifyOnly signifies if we want to check if the partition is in the + // transaction rather than add it. + VerifyOnly: bool + Topics: [=>] + Topic: string + Partitions: [int32] // AddPartitionsToTxnResponse is a response to an AddPartitionsToTxnRequest. AddPartitionsToTxnResponse => ThrottleMillis(1) + // The response top level error code. + ErrorCode: int16 // v4+ + // Results categorized by transactional ID, v4+ only, for brokers only. + // The fields duplicate v3 and below fields (except TransactionalID) and + // are left undocumented. + Transactions: [=>] // v4+ + // The transactional id corresponding to the transaction. + TransactionalID: string + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 // Topics are responses to topics in the request. - Topics: [=>] + Topics: [=>] // v0-v3 // Topic is a topic being responded to. Topic: string // Partitions are responses to partitions in the request. diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index c9c240cd..10987d2f 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -19800,34 +19800,97 @@ func NewAddPartitionsToTxnRequestTopic() AddPartitionsToTxnRequestTopic { return v } +type AddPartitionsToTxnRequestTransactionTopic struct { + Topic string + + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnRequestTransactionTopic. +func (v *AddPartitionsToTxnRequestTransactionTopic) Default() { +} + +// NewAddPartitionsToTxnRequestTransactionTopic returns a default AddPartitionsToTxnRequestTransactionTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnRequestTransactionTopic() AddPartitionsToTxnRequestTransactionTopic { + var v AddPartitionsToTxnRequestTransactionTopic + v.Default() + return v +} + +type AddPartitionsToTxnRequestTransaction struct { + TransactionalID string + + ProducerID int64 + + ProducerEpoch int16 + + // VerifyOnly signifies if we want to check if the partition is in the + // transaction rather than add it. + VerifyOnly bool + + Topics []AddPartitionsToTxnRequestTransactionTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnRequestTransaction. +func (v *AddPartitionsToTxnRequestTransaction) Default() { +} + +// NewAddPartitionsToTxnRequestTransaction returns a default AddPartitionsToTxnRequestTransaction +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnRequestTransaction() AddPartitionsToTxnRequestTransaction { + var v AddPartitionsToTxnRequestTransaction + v.Default() + return v +} + // AddPartitionsToTxnRequest begins the producer side of a transaction for all // partitions in the request. Before producing any records to a partition in // the transaction, that partition must have been added to the transaction with // this request. +// +// Versions 3 and below are exclusively used by clients and versions 4 and +// above are used by brokers. +// +// Version 4 adds VerifyOnly field to check if partitions are already in +// transaction and adds support to batch multiple transactions. type AddPartitionsToTxnRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 // TransactionalID is the transactional ID to use for this request. - TransactionalID string + TransactionalID string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID // as received from InitProducerID. - ProducerID int64 + ProducerID int64 // v0-v3 // ProducerEpoch is the producer epoch of the client for this transactional ID // as received from InitProducerID. - ProducerEpoch int16 + ProducerEpoch int16 // v0-v3 // Topics are topics to add as part of the producer side of a transaction. - Topics []AddPartitionsToTxnRequestTopic + Topics []AddPartitionsToTxnRequestTopic // v0-v3 + + // The list of transactions to add partitions to, for v4+, for brokers only. + // The fields in this are batch broker requests that duplicate the above fields + // and thus are undocumented (except VerifyOnly, which is new). + Transactions []AddPartitionsToTxnRequestTransaction // v4+ // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*AddPartitionsToTxnRequest) Key() int16 { return 24 } -func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 3 } +func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 4 } func (v *AddPartitionsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -19852,7 +19915,7 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { _ = version isFlexible := version >= 3 _ = isFlexible - { + if version >= 0 && version <= 3 { v := v.TransactionalID if isFlexible { dst = kbin.AppendCompactString(dst, v) @@ -19860,15 +19923,15 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } - { + if version >= 0 && version <= 3 { v := v.ProducerID dst = kbin.AppendInt64(dst, v) } - { + if version >= 0 && version <= 3 { v := v.ProducerEpoch dst = kbin.AppendInt16(dst, v) } - { + if version >= 0 && version <= 3 { v := v.Topics if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) @@ -19903,6 +19966,76 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { } } } + if version >= 4 { + v := v.Transactions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TransactionalID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.ProducerID + dst = kbin.AppendInt64(dst, v) + } + { + v := v.ProducerEpoch + dst = kbin.AppendInt16(dst, v) + } + { + v := v.VerifyOnly + dst = kbin.AppendBool(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -19926,7 +20059,7 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { isFlexible := version >= 3 _ = isFlexible s := v - { + if version >= 0 && version <= 3 { var v string if unsafe { if isFlexible { @@ -19943,15 +20076,15 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { } s.TransactionalID = v } - { + if version >= 0 && version <= 3 { v := b.Int64() s.ProducerID = v } - { + if version >= 0 && version <= 3 { v := b.Int16() s.ProducerEpoch = v } - { + if version >= 0 && version <= 3 { v := s.Topics a := v var l int32 @@ -20018,6 +20151,129 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { v = a s.Topics = v } + if version >= 4 { + v := s.Transactions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnRequestTransaction, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.TransactionalID = v + } + { + v := b.Int64() + s.ProducerID = v + } + { + v := b.Int16() + s.ProducerEpoch = v + } + { + v := b.Bool() + s.VerifyOnly = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnRequestTransactionTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Transactions = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -20045,6 +20301,73 @@ func NewAddPartitionsToTxnRequest() AddPartitionsToTxnRequest { return v } +type AddPartitionsToTxnResponseTransactionTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransactionTopicPartition. +func (v *AddPartitionsToTxnResponseTransactionTopicPartition) Default() { +} + +// NewAddPartitionsToTxnResponseTransactionTopicPartition returns a default AddPartitionsToTxnResponseTransactionTopicPartition +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransactionTopicPartition() AddPartitionsToTxnResponseTransactionTopicPartition { + var v AddPartitionsToTxnResponseTransactionTopicPartition + v.Default() + return v +} + +type AddPartitionsToTxnResponseTransactionTopic struct { + Topic string + + Partitions []AddPartitionsToTxnResponseTransactionTopicPartition + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransactionTopic. +func (v *AddPartitionsToTxnResponseTransactionTopic) Default() { +} + +// NewAddPartitionsToTxnResponseTransactionTopic returns a default AddPartitionsToTxnResponseTransactionTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransactionTopic() AddPartitionsToTxnResponseTransactionTopic { + var v AddPartitionsToTxnResponseTransactionTopic + v.Default() + return v +} + +type AddPartitionsToTxnResponseTransaction struct { + // The transactional id corresponding to the transaction. + TransactionalID string + + Topics []AddPartitionsToTxnResponseTransactionTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransaction. +func (v *AddPartitionsToTxnResponseTransaction) Default() { +} + +// NewAddPartitionsToTxnResponseTransaction returns a default AddPartitionsToTxnResponseTransaction +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransaction() AddPartitionsToTxnResponseTransaction { + var v AddPartitionsToTxnResponseTransaction + v.Default() + return v +} + type AddPartitionsToTxnResponseTopicPartition struct { // Partition is a partition being responded to. Partition int32 @@ -20137,15 +20460,23 @@ type AddPartitionsToTxnResponse struct { // This request switched at version 1. ThrottleMillis int32 + // The response top level error code. + ErrorCode int16 // v4+ + + // Results categorized by transactional ID, v4+ only, for brokers only. + // The fields duplicate v3 and below fields (except TransactionalID) and + // are left undocumented. + Transactions []AddPartitionsToTxnResponseTransaction // v4+ + // Topics are responses to topics in the request. - Topics []AddPartitionsToTxnResponseTopic + Topics []AddPartitionsToTxnResponseTopic // v0-v3 // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*AddPartitionsToTxnResponse) Key() int16 { return 24 } -func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 3 } +func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 4 } func (v *AddPartitionsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -20170,7 +20501,80 @@ func (v *AddPartitionsToTxnResponse) AppendTo(dst []byte) []byte { v := v.ThrottleMillis dst = kbin.AppendInt32(dst, v) } - { + if version >= 4 { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + if version >= 4 { + v := v.Transactions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TransactionalID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if version >= 0 && version <= 3 { v := v.Topics if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) @@ -20243,7 +20647,134 @@ func (v *AddPartitionsToTxnResponse) readFrom(src []byte, unsafe bool) error { v := b.Int32() s.ThrottleMillis = v } - { + if version >= 4 { + v := b.Int16() + s.ErrorCode = v + } + if version >= 4 { + v := s.Transactions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransaction, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.TransactionalID = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransactionTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransactionTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Transactions = v + } + if version >= 0 && version <= 3 { v := s.Topics a := v var l int32 diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index facad4e2..5914de09 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -304,6 +304,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { {max330, "v3.3"}, {max340, "v3.4"}, {max350, "v3.5"}, + {max360, "v3.6"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { if v == -1 { @@ -445,6 +446,10 @@ func V3_3_0() *Versions { return zkBrokerOf(max330) } func V3_4_0() *Versions { return zkBrokerOf(max340) } func V3_5_0() *Versions { return zkBrokerOf(max350) } +/* TODO wait for franz-go v1.16 +func V3_6_0() *Versions { return zkBrokerOf(max360) } +*/ + func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} } @@ -1051,8 +1056,14 @@ var max350 = nextMax(max340, func(v listenerKeys) listenerKeys { return v }) +var max360 = nextMax(max350, func(v listenerKeys) listenerKeys { + // KAFKA-14402 29a1a16668d76a1cc04ec9e39ea13026f2dce1de KIP-890 + v[24].inc() // 4 add partitions to txn + return v +}) + var ( - maxStable = max350 + maxStable = max360 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { return v })