From 5e4f93465e6368d830785c89c601a2356b5bc66f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 2 Apr 2024 13:49:05 -0600 Subject: [PATCH 1/5] KIP-951 proto --- generate/definitions/00_produce | 18 +++++++++++++++++- generate/definitions/01_fetch | 13 ++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/generate/definitions/00_produce b/generate/definitions/00_produce index 512e2eed..4afd2651 100644 --- a/generate/definitions/00_produce +++ b/generate/definitions/00_produce @@ -6,7 +6,7 @@ // Note that the special client ID "__admin_client" will allow you to produce // records to internal topics. This is generally recommended if you want to // break your Kafka cluster. -ProduceRequest => key 0, max version 9, flexible v9+ +ProduceRequest => key 0, max version 10, flexible v9+ // TransactionID is the transaction ID to use for this request, allowing for // exactly once semantics. TransactionID: nullable-string // v3+ @@ -145,4 +145,20 @@ ProduceResponse => // ErrorMessage is the global error message of of what caused this batch // to error. ErrorMessage: nullable-string // v8+ + CurrentLeader: => // tag 0 + // The ID of the current leader, or -1 if unknown. + LeaderID: int32(-1) + // The latest known leader epoch. + LeaderEpoch: int32(-1) ThrottleMillis(6) // v1+ + // Brokers is present if any partition responses contain the error + // NOT_LEADER_OR_FOLLOWER. + Brokers: [=>] // tag 0 + // NodeID is the node ID of a Kafka broker. + NodeID: int32 + // Host is the hostname of a Kafka broker. + Host: string + // Port is the port of a Kafka broker. + Port: int32 + // Rack is the rack this Kafka broker is in. + Rack: nullable-string diff --git a/generate/definitions/01_fetch b/generate/definitions/01_fetch index 5c25163d..f1198d4e 100644 --- a/generate/definitions/01_fetch +++ b/generate/definitions/01_fetch @@ -13,7 +13,7 @@ // // Version 15 adds the ReplicaState which includes new field ReplicaEpoch and // the ReplicaID, and deprecates the old ReplicaID (KIP-903). -FetchRequest => key 1, max version 15, flexible v12+ +FetchRequest => key 1, max version 16, flexible v12+ // The cluster ID, if known. This is used to validate metadata fetches // prior to broker registration. ClusterID: nullable-string(null) // tag 0 @@ -229,3 +229,14 @@ FetchResponse => // Starting v4, this transitioned to the RecordBatch format (thus this // contains many RecordBatch structs). RecordBatches: nullable-bytes + // Brokers is present if any partition responses contain the error + // NOT_LEADER_OR_FOLLOWER. + Brokers: [=>] // tag 0 + // NodeID is the node ID of a Kafka broker. + NodeID: int32 + // Host is the hostname of a Kafka broker. + Host: string + // Port is the port of a Kafka broker. + Port: int32 + // Rack is the rack this Kafka broker is in. + Rack: nullable-string From 0625326fda995fe9d76ea70da1b0c023cacba2c2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 2 Apr 2024 13:52:42 -0600 Subject: [PATCH 2/5] KIP-848 proto for OffsetCommit, OffsetFetch These are now stable in 3.7 --- generate/definitions/08_offset_commit | 2 +- generate/definitions/09_offset_fetch | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/generate/definitions/08_offset_commit b/generate/definitions/08_offset_commit index 21859c3f..8c592221 100644 --- a/generate/definitions/08_offset_commit +++ b/generate/definitions/08_offset_commit @@ -1,6 +1,6 @@ // OffsetCommitRequest commits offsets for consumed topics / partitions in // a group. -OffsetCommitRequest => key 8, max version 8, flexible v8+, group coordinator +OffsetCommitRequest => key 8, max version 9, flexible v8+, group coordinator // Group is the group this request is committing offsets to. Group: string // Generation being -1 and group being empty means the group is being used diff --git a/generate/definitions/09_offset_fetch b/generate/definitions/09_offset_fetch index 8a87dc6d..3b1f3b31 100644 --- a/generate/definitions/09_offset_fetch +++ b/generate/definitions/09_offset_fetch @@ -1,6 +1,6 @@ // OffsetFetchRequest requests the most recent committed offsets for topic // partitions in a group. -OffsetFetchRequest => key 9, max version 8, flexible v6+, group coordinator +OffsetFetchRequest => key 9, max version 9, flexible v6+, group coordinator // Group is the group to fetch offsets for. Group: string // v0-v7 // Topics contains topics to fetch offets for. Version 2+ allows this to be @@ -17,6 +17,10 @@ OffsetFetchRequest => key 9, max version 8, flexible v6+, group coordinator // are left undocumented. Refer to the top level documentation if necessary. Groups: [=>] // v8+ Group: string + // The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848). + MemberID: nullable-string // v9+ + // The member epoch if using the new consumer protocol (KIP-848). + MemberEpoch: int32(-1) // v9+ Topics: nullable[=>] Topic: string Partitions: [int32] From 5c40e667a60e8cdee409a7edbf473a0223d18f90 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 2 Apr 2024 13:56:50 -0600 Subject: [PATCH 3/5] KIP-919 proto Minor change, requires a note in kversion but nothing more --- generate/definitions/60_describe_cluster | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/generate/definitions/60_describe_cluster b/generate/definitions/60_describe_cluster index 8674cde9..aee933c5 100644 --- a/generate/definitions/60_describe_cluster +++ b/generate/definitions/60_describe_cluster @@ -1,10 +1,12 @@ // Introduced for KIP-700, DescribeClusterRequest is effectively an "admin" // type metadata request for information that producers or consumers do not // need to care about. -DescribeClusterRequest => key 60, max version 0, flexible v0+ +DescribeClusterRequest => key 60, max version 1, flexible v0+ // Whether to include cluster authorized operations. This requires DESCRIBE // on CLUSTER. IncludeClusterAuthorizedOperations: bool + // The endpoint type to describe. 1=brokers, 2=controllers. + EndpointType: int8(1) // v1+ // DescribeClusterResponse is a response to a DescribeClusterRequest. DescribeClusterResponse => @@ -13,6 +15,8 @@ DescribeClusterResponse => ErrorCode: int16 // The top level error message, if any. ErrorMessage: nullable-string + // The endpoint type that was described. 1=brokers, 2=controllers. + EndpointType: int8(1) // v1+ // The cluster ID that responding broker belongs to. ClusterID: string // The ID of the controller broker. From b5869b03f7ef010abc042361616a7627586cc561 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 2 Apr 2024 14:34:25 -0600 Subject: [PATCH 4/5] KIP-848: add ConsumerGroupHeartbeat proto --- .../definitions/68_consumer_group_heartbeat | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 generate/definitions/68_consumer_group_heartbeat diff --git a/generate/definitions/68_consumer_group_heartbeat b/generate/definitions/68_consumer_group_heartbeat new file mode 100644 index 00000000..27564e74 --- /dev/null +++ b/generate/definitions/68_consumer_group_heartbeat @@ -0,0 +1,61 @@ +// ConsumerGroupHeartbeat is a part of KIP-848; there are a lot of details +// to this request so documentation is left to the KIP itself. +ConsumerGroupHeartbeatRequest => key 68, max version 0, flexible v0+ + // The group ID. + Group: string + // The member ID generated by the coordinator. This must be kept during + // the entire lifetime of the member. + MemberID: string + // The current member epoch; 0 to join the group, -1 to leave, -2 to + // indicate that the static member will rejoin. + MemberEpoch: int32 + // Instance ID of the member; null if not provided or if unchanging. + InstanceID: nullable-string + // The rack ID of the member; null if not provided or if unchanging. + RackID: nullable-string + // RebalanceTimeoutMillis is how long the coordinator will wait on a member + // to revoke its partitions. -1 if unchanging. + RebalanceTimeoutMillis: int32(-1) + // Subscribed topics; null if unchanging. + SubscribedTopicNames: nullable[string] + // The server side assignor to use; null if unchanging. + ServerAssignor: nullable-string + // Topic partitions owned by the member; null if unchanging. + Topics: nullable[=>] + // The topic ID. + TopicID: uuid + // The partitions. + Partitions: [int32] + +// ConsumerGroupHeartbeatResponse is returned from a ConsumerGroupHeartbeatRequest. +ConsumerGroupHeartbeatResponse => + ThrottleMillis + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - FENCED_MEMBER_EPOCH (version 0+) + // - UNSUPPORTED_ASSIGNOR (version 0+) + // - UNRELEASED_INSTANCE_ID (version 0+) + // - GROUP_MAX_SIZE_REACHED (version 0+) + ErrorCode: int16 + // A supplementary message if this errored. + ErrorMessage: nullable-string + // The member ID generated by the coordinator; provided when joining + // with MemberEpoch=0. + MemberID: nullable-string + // The member epoch. + MemberEpoch: int32 + // The heartbeat interval, in milliseconds. + HeartbeatIntervalMillis: int32 + // The assignment; null if not provided. + Assignment: nullable=> + // The topics partitions that can be used immediately. + Topics: [=>] + TopicID: uuid + Partitions: [int32] From f5106ae0c7280f7cb89db7bd7c870a93960a4767 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 2 Apr 2024 14:35:35 -0600 Subject: [PATCH 5/5] pkg/kmsg: commit generated protocols for 3.7 --- pkg/kmsg/generated.go | 1325 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 1308 insertions(+), 17 deletions(-) diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 10987d2f..75bff995 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -13,7 +13,7 @@ import ( // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 67 +const MaxKey = 68 // MessageV0 is the message format Kafka used prior to 0.10. // @@ -2783,7 +2783,7 @@ type ProduceRequest struct { } func (*ProduceRequest) Key() int16 { return 0 } -func (*ProduceRequest) MaxVersion() int16 { return 9 } +func (*ProduceRequest) MaxVersion() int16 { return 10 } func (v *ProduceRequest) SetVersion(version int16) { v.Version = version } func (v *ProduceRequest) GetVersion() int16 { return v.Version } func (v *ProduceRequest) IsFlexible() bool { return v.Version >= 9 } @@ -3059,6 +3059,36 @@ func NewProduceResponseTopicPartitionErrorRecord() ProduceResponseTopicPartition return v } +type ProduceResponseTopicPartitionCurrentLeader struct { + // The ID of the current leader, or -1 if unknown. + // + // This field has a default of -1. + LeaderID int32 + + // The latest known leader epoch. + // + // This field has a default of -1. + LeaderEpoch int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v9+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ProduceResponseTopicPartitionCurrentLeader. +func (v *ProduceResponseTopicPartitionCurrentLeader) Default() { + v.LeaderID = -1 + v.LeaderEpoch = -1 +} + +// NewProduceResponseTopicPartitionCurrentLeader returns a default ProduceResponseTopicPartitionCurrentLeader +// This is a shortcut for creating a struct and calling Default yourself. +func NewProduceResponseTopicPartitionCurrentLeader() ProduceResponseTopicPartitionCurrentLeader { + var v ProduceResponseTopicPartitionCurrentLeader + v.Default() + return v +} + type ProduceResponseTopicPartition struct { // Partition is the partition this response pertains to. Partition int32 @@ -3168,6 +3198,8 @@ type ProduceResponseTopicPartition struct { // to error. ErrorMessage *string // v8+ + CurrentLeader ProduceResponseTopicPartitionCurrentLeader // tag 0 + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v9+ } @@ -3177,6 +3209,12 @@ type ProduceResponseTopicPartition struct { func (v *ProduceResponseTopicPartition) Default() { v.LogAppendTime = -1 v.LogStartOffset = -1 + { + v := &v.CurrentLeader + _ = v + v.LeaderID = -1 + v.LeaderEpoch = -1 + } } // NewProduceResponseTopicPartition returns a default ProduceResponseTopicPartition @@ -3212,6 +3250,36 @@ func NewProduceResponseTopic() ProduceResponseTopic { return v } +type ProduceResponseBroker struct { + // NodeID is the node ID of a Kafka broker. + NodeID int32 + + // Host is the hostname of a Kafka broker. + Host string + + // Port is the port of a Kafka broker. + Port int32 + + // Rack is the rack this Kafka broker is in. + Rack *string + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v9+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ProduceResponseBroker. +func (v *ProduceResponseBroker) Default() { +} + +// NewProduceResponseBroker returns a default ProduceResponseBroker +// This is a shortcut for creating a struct and calling Default yourself. +func NewProduceResponseBroker() ProduceResponseBroker { + var v ProduceResponseBroker + v.Default() + return v +} + // ProduceResponse is returned from a ProduceRequest. type ProduceResponse struct { // Version is the version of this message used with a Kafka broker. @@ -3229,12 +3297,16 @@ type ProduceResponse struct { // This request switched at version 6. ThrottleMillis int32 // v1+ + // Brokers is present if any partition responses contain the error + // NOT_LEADER_OR_FOLLOWER. + Brokers []ProduceResponseBroker // tag 0 + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v9+ } func (*ProduceResponse) Key() int16 { return 0 } -func (*ProduceResponse) MaxVersion() int16 { return 9 } +func (*ProduceResponse) MaxVersion() int16 { return 10 } func (v *ProduceResponse) SetVersion(version int16) { v.Version = version } func (v *ProduceResponse) GetVersion() int16 { return v.Version } func (v *ProduceResponse) IsFlexible() bool { return v.Version >= 9 } @@ -3329,7 +3401,44 @@ func (v *ProduceResponse) AppendTo(dst []byte) []byte { } } if isFlexible { - dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + var toEncode []uint32 + if !reflect.DeepEqual(v.CurrentLeader, (func() ProduceResponseTopicPartitionCurrentLeader { + var v ProduceResponseTopicPartitionCurrentLeader + v.Default() + return v + })()) { + toEncode = append(toEncode, 0) + } + dst = kbin.AppendUvarint(dst, uint32(len(toEncode)+v.UnknownTags.Len())) + for _, tag := range toEncode { + switch tag { + case 0: + { + v := v.CurrentLeader + dst = kbin.AppendUvarint(dst, 0) + sized := false + lenAt := len(dst) + fCurrentLeader: + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + if !sized { + dst = kbin.AppendUvarint(dst[:lenAt], uint32(len(dst[lenAt:]))) + sized = true + goto fCurrentLeader + } + } + } + } dst = v.UnknownTags.AppendEach(dst) } } @@ -3345,7 +3454,64 @@ func (v *ProduceResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendInt32(dst, v) } if isFlexible { - dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + var toEncode []uint32 + if len(v.Brokers) > 0 { + toEncode = append(toEncode, 0) + } + dst = kbin.AppendUvarint(dst, uint32(len(toEncode)+v.UnknownTags.Len())) + for _, tag := range toEncode { + switch tag { + case 0: + { + v := v.Brokers + dst = kbin.AppendUvarint(dst, 0) + sized := false + lenAt := len(dst) + fBrokers: + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.NodeID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Host + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Port + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Rack + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + if !sized { + dst = kbin.AppendUvarint(dst[:lenAt], uint32(len(dst[lenAt:]))) + sized = true + goto fBrokers + } + } + } + } dst = v.UnknownTags.AppendEach(dst) } return dst @@ -3510,7 +3676,31 @@ func (v *ProduceResponse) readFrom(src []byte, unsafe bool) error { s.ErrorMessage = v } if isFlexible { - s.UnknownTags = internalReadTags(&b) + for i := b.Uvarint(); i > 0; i-- { + switch key := b.Uvarint(); key { + default: + s.UnknownTags.Set(key, b.Span(int(b.Uvarint()))) + case 0: + b := kbin.Reader{Src: b.Span(int(b.Uvarint()))} + v := &s.CurrentLeader + v.Default() + s := v + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + if err := b.Complete(); err != nil { + return err + } + } + } } } v = a @@ -3528,7 +3718,84 @@ func (v *ProduceResponse) readFrom(src []byte, unsafe bool) error { s.ThrottleMillis = v } if isFlexible { - s.UnknownTags = internalReadTags(&b) + for i := b.Uvarint(); i > 0; i-- { + switch key := b.Uvarint(); key { + default: + s.UnknownTags.Set(key, b.Span(int(b.Uvarint()))) + case 0: + b := kbin.Reader{Src: b.Span(int(b.Uvarint()))} + v := s.Brokers + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ProduceResponseBroker, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.NodeID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Host = v + } + { + v := b.Int32() + s.Port = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.Rack = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Brokers = v + if err := b.Complete(); err != nil { + return err + } + } + } } return b.Complete() } @@ -3786,7 +4053,7 @@ type FetchRequest struct { } func (*FetchRequest) Key() int16 { return 1 } -func (*FetchRequest) MaxVersion() int16 { return 15 } +func (*FetchRequest) MaxVersion() int16 { return 16 } func (v *FetchRequest) SetVersion(version int16) { v.Version = version } func (v *FetchRequest) GetVersion() int16 { return v.Version } func (v *FetchRequest) IsFlexible() bool { return v.Version >= 12 } @@ -4602,6 +4869,36 @@ func NewFetchResponseTopic() FetchResponseTopic { return v } +type FetchResponseBroker struct { + // NodeID is the node ID of a Kafka broker. + NodeID int32 + + // Host is the hostname of a Kafka broker. + Host string + + // Port is the port of a Kafka broker. + Port int32 + + // Rack is the rack this Kafka broker is in. + Rack *string + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v12+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to FetchResponseBroker. +func (v *FetchResponseBroker) Default() { +} + +// NewFetchResponseBroker returns a default FetchResponseBroker +// This is a shortcut for creating a struct and calling Default yourself. +func NewFetchResponseBroker() FetchResponseBroker { + var v FetchResponseBroker + v.Default() + return v +} + // FetchResponse is returned from a FetchRequest. type FetchResponse struct { // Version is the version of this message used with a Kafka broker. @@ -4635,12 +4932,16 @@ type FetchResponse struct { // for them. Topics []FetchResponseTopic + // Brokers is present if any partition responses contain the error + // NOT_LEADER_OR_FOLLOWER. + Brokers []FetchResponseBroker // tag 0 + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v12+ } func (*FetchResponse) Key() int16 { return 1 } -func (*FetchResponse) MaxVersion() int16 { return 15 } +func (*FetchResponse) MaxVersion() int16 { return 16 } func (v *FetchResponse) SetVersion(version int16) { v.Version = version } func (v *FetchResponse) GetVersion() int16 { return v.Version } func (v *FetchResponse) IsFlexible() bool { return v.Version >= 12 } @@ -4864,7 +5165,64 @@ func (v *FetchResponse) AppendTo(dst []byte) []byte { } } if isFlexible { - dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + var toEncode []uint32 + if len(v.Brokers) > 0 { + toEncode = append(toEncode, 0) + } + dst = kbin.AppendUvarint(dst, uint32(len(toEncode)+v.UnknownTags.Len())) + for _, tag := range toEncode { + switch tag { + case 0: + { + v := v.Brokers + dst = kbin.AppendUvarint(dst, 0) + sized := false + lenAt := len(dst) + fBrokers: + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.NodeID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Host + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Port + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Rack + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + if !sized { + dst = kbin.AppendUvarint(dst[:lenAt], uint32(len(dst[lenAt:]))) + sized = true + goto fBrokers + } + } + } + } dst = v.UnknownTags.AppendEach(dst) } return dst @@ -5107,7 +5465,84 @@ func (v *FetchResponse) readFrom(src []byte, unsafe bool) error { s.Topics = v } if isFlexible { - s.UnknownTags = internalReadTags(&b) + for i := b.Uvarint(); i > 0; i-- { + switch key := b.Uvarint(); key { + default: + s.UnknownTags.Set(key, b.Span(int(b.Uvarint()))) + case 0: + b := kbin.Reader{Src: b.Span(int(b.Uvarint()))} + v := s.Brokers + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]FetchResponseBroker, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.NodeID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Host = v + } + { + v := b.Int32() + s.Port = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.Rack = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Brokers = v + if err := b.Complete(); err != nil { + return err + } + } + } } return b.Complete() } @@ -10189,7 +10624,7 @@ type OffsetCommitRequest struct { } func (*OffsetCommitRequest) Key() int16 { return 8 } -func (*OffsetCommitRequest) MaxVersion() int16 { return 8 } +func (*OffsetCommitRequest) MaxVersion() int16 { return 9 } func (v *OffsetCommitRequest) SetVersion(version int16) { v.Version = version } func (v *OffsetCommitRequest) GetVersion() int16 { return v.Version } func (v *OffsetCommitRequest) IsFlexible() bool { return v.Version >= 8 } @@ -10625,7 +11060,7 @@ type OffsetCommitResponse struct { } func (*OffsetCommitResponse) Key() int16 { return 8 } -func (*OffsetCommitResponse) MaxVersion() int16 { return 8 } +func (*OffsetCommitResponse) MaxVersion() int16 { return 9 } func (v *OffsetCommitResponse) SetVersion(version int16) { v.Version = version } func (v *OffsetCommitResponse) GetVersion() int16 { return v.Version } func (v *OffsetCommitResponse) IsFlexible() bool { return v.Version >= 8 } @@ -10870,6 +11305,14 @@ func NewOffsetFetchRequestGroupTopic() OffsetFetchRequestGroupTopic { type OffsetFetchRequestGroup struct { Group string + // The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848). + MemberID *string // v9+ + + // The member epoch if using the new consumer protocol (KIP-848). + // + // This field has a default of -1. + MemberEpoch int32 // v9+ + Topics []OffsetFetchRequestGroupTopic // UnknownTags are tags Kafka sent that we do not know the purpose of. @@ -10879,6 +11322,7 @@ type OffsetFetchRequestGroup struct { // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to OffsetFetchRequestGroup. func (v *OffsetFetchRequestGroup) Default() { + v.MemberEpoch = -1 } // NewOffsetFetchRequestGroup returns a default OffsetFetchRequestGroup @@ -10920,7 +11364,7 @@ type OffsetFetchRequest struct { } func (*OffsetFetchRequest) Key() int16 { return 9 } -func (*OffsetFetchRequest) MaxVersion() int16 { return 8 } +func (*OffsetFetchRequest) MaxVersion() int16 { return 9 } func (v *OffsetFetchRequest) SetVersion(version int16) { v.Version = version } func (v *OffsetFetchRequest) GetVersion() int16 { return v.Version } func (v *OffsetFetchRequest) IsFlexible() bool { return v.Version >= 6 } @@ -11013,6 +11457,18 @@ func (v *OffsetFetchRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 9 { + v := v.MemberID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + if version >= 9 { + v := v.MemberEpoch + dst = kbin.AppendInt32(dst, v) + } { v := v.Topics if isFlexible { @@ -11205,6 +11661,27 @@ func (v *OffsetFetchRequest) readFrom(src []byte, unsafe bool) error { } s.Group = v } + if version >= 9 { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.MemberID = v + } + if version >= 9 { + v := b.Int32() + s.MemberEpoch = v + } { v := s.Topics a := v @@ -11504,7 +11981,7 @@ type OffsetFetchResponse struct { } func (*OffsetFetchResponse) Key() int16 { return 9 } -func (*OffsetFetchResponse) MaxVersion() int16 { return 8 } +func (*OffsetFetchResponse) MaxVersion() int16 { return 9 } func (v *OffsetFetchResponse) SetVersion(version int16) { v.Version = version } func (v *OffsetFetchResponse) GetVersion() int16 { return v.Version } func (v *OffsetFetchResponse) IsFlexible() bool { return v.Version >= 6 } @@ -40660,12 +41137,17 @@ type DescribeClusterRequest struct { // on CLUSTER. IncludeClusterAuthorizedOperations bool + // The endpoint type to describe. 1=brokers, 2=controllers. + // + // This field has a default of 1. + EndpointType int8 // v1+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags } func (*DescribeClusterRequest) Key() int16 { return 60 } -func (*DescribeClusterRequest) MaxVersion() int16 { return 0 } +func (*DescribeClusterRequest) MaxVersion() int16 { return 1 } func (v *DescribeClusterRequest) SetVersion(version int16) { v.Version = version } func (v *DescribeClusterRequest) GetVersion() int16 { return v.Version } func (v *DescribeClusterRequest) IsFlexible() bool { return v.Version >= 0 } @@ -40693,6 +41175,10 @@ func (v *DescribeClusterRequest) AppendTo(dst []byte) []byte { v := v.IncludeClusterAuthorizedOperations dst = kbin.AppendBool(dst, v) } + if version >= 1 { + v := v.EndpointType + dst = kbin.AppendInt8(dst, v) + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -40720,6 +41206,10 @@ func (v *DescribeClusterRequest) readFrom(src []byte, unsafe bool) error { v := b.Bool() s.IncludeClusterAuthorizedOperations = v } + if version >= 1 { + v := b.Int8() + s.EndpointType = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -40737,6 +41227,7 @@ func NewPtrDescribeClusterRequest() *DescribeClusterRequest { // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to DescribeClusterRequest. func (v *DescribeClusterRequest) Default() { + v.EndpointType = 1 } // NewDescribeClusterRequest returns a default DescribeClusterRequest @@ -40792,6 +41283,11 @@ type DescribeClusterResponse struct { // The top level error message, if any. ErrorMessage *string + // The endpoint type that was described. 1=brokers, 2=controllers. + // + // This field has a default of 1. + EndpointType int8 // v1+ + // The cluster ID that responding broker belongs to. ClusterID string @@ -40813,7 +41309,7 @@ type DescribeClusterResponse struct { } func (*DescribeClusterResponse) Key() int16 { return 60 } -func (*DescribeClusterResponse) MaxVersion() int16 { return 0 } +func (*DescribeClusterResponse) MaxVersion() int16 { return 1 } func (v *DescribeClusterResponse) SetVersion(version int16) { v.Version = version } func (v *DescribeClusterResponse) GetVersion() int16 { return v.Version } func (v *DescribeClusterResponse) IsFlexible() bool { return v.Version >= 0 } @@ -40847,6 +41343,10 @@ func (v *DescribeClusterResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendNullableString(dst, v) } } + if version >= 1 { + v := v.EndpointType + dst = kbin.AppendInt8(dst, v) + } { v := v.ClusterID if isFlexible { @@ -40950,6 +41450,10 @@ func (v *DescribeClusterResponse) readFrom(src []byte, unsafe bool) error { } s.ErrorMessage = v } + if version >= 1 { + v := b.Int8() + s.EndpointType = v + } { var v string if unsafe { @@ -41061,6 +41565,7 @@ func NewPtrDescribeClusterResponse() *DescribeClusterResponse { // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to DescribeClusterResponse. func (v *DescribeClusterResponse) Default() { + v.EndpointType = 1 v.ControllerID = -1 v.ClusterAuthorizedOperations = -2147483648 } @@ -44043,6 +44548,785 @@ func NewAllocateProducerIDsResponse() AllocateProducerIDsResponse { return v } +type ConsumerGroupHeartbeatRequestTopic struct { + // The topic ID. + TopicID [16]byte + + // The partitions. + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupHeartbeatRequestTopic. +func (v *ConsumerGroupHeartbeatRequestTopic) Default() { +} + +// NewConsumerGroupHeartbeatRequestTopic returns a default ConsumerGroupHeartbeatRequestTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupHeartbeatRequestTopic() ConsumerGroupHeartbeatRequestTopic { + var v ConsumerGroupHeartbeatRequestTopic + v.Default() + return v +} + +// ConsumerGroupHeartbeat is a part of KIP-848; there are a lot of details +// to this request so documentation is left to the KIP itself. +type ConsumerGroupHeartbeatRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The group ID. + Group string + + // The member ID generated by the coordinator. This must be kept during + // the entire lifetime of the member. + MemberID string + + // The current member epoch; 0 to join the group, -1 to leave, -2 to + // indicate that the static member will rejoin. + MemberEpoch int32 + + // Instance ID of the member; null if not provided or if unchanging. + InstanceID *string + + // The rack ID of the member; null if not provided or if unchanging. + RackID *string + + // RebalanceTimeoutMillis is how long the coordinator will wait on a member + // to revoke its partitions. -1 if unchanging. + // + // This field has a default of -1. + RebalanceTimeoutMillis int32 + + // Subscribed topics; null if unchanging. + SubscribedTopicNames []string + + // The server side assignor to use; null if unchanging. + ServerAssignor *string + + // Topic partitions owned by the member; null if unchanging. + Topics []ConsumerGroupHeartbeatRequestTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupHeartbeatRequest) Key() int16 { return 68 } +func (*ConsumerGroupHeartbeatRequest) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupHeartbeatRequest) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupHeartbeatRequest) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupHeartbeatRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupHeartbeatRequest) ResponseKind() Response { + r := &ConsumerGroupHeartbeatResponse{Version: v.Version} + r.Default() + return r +} + +// RequestWith is requests v on r and returns the response or an error. +// For sharded requests, the response may be merged and still return an error. +// It is better to rely on client.RequestSharded than to rely on proper merging behavior. +func (v *ConsumerGroupHeartbeatRequest) RequestWith(ctx context.Context, r Requestor) (*ConsumerGroupHeartbeatResponse, error) { + kresp, err := r.Request(ctx, v) + resp, _ := kresp.(*ConsumerGroupHeartbeatResponse) + return resp, err +} + +func (v *ConsumerGroupHeartbeatRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.Group + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.MemberID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.MemberEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.InstanceID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.RackID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.RebalanceTimeoutMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.SubscribedTopicNames + if isFlexible { + dst = kbin.AppendCompactNullableArrayLen(dst, len(v), v == nil) + } else { + dst = kbin.AppendNullableArrayLen(dst, len(v), v == nil) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } + { + v := v.ServerAssignor + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactNullableArrayLen(dst, len(v), v == nil) + } else { + dst = kbin.AppendNullableArrayLen(dst, len(v), v == nil) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupHeartbeatRequest) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupHeartbeatRequest) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupHeartbeatRequest) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Group = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.MemberID = v + } + { + v := b.Int32() + s.MemberEpoch = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.InstanceID = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.RackID = v + } + { + v := b.Int32() + s.RebalanceTimeoutMillis = v + } + { + v := s.SubscribedTopicNames + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if version < 0 || l == 0 { + a = []string{} + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.SubscribedTopicNames = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.ServerAssignor = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if version < 0 || l == 0 { + a = []ConsumerGroupHeartbeatRequestTopic{} + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupHeartbeatRequestTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupHeartbeatRequest returns a pointer to a default ConsumerGroupHeartbeatRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupHeartbeatRequest() *ConsumerGroupHeartbeatRequest { + var v ConsumerGroupHeartbeatRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupHeartbeatRequest. +func (v *ConsumerGroupHeartbeatRequest) Default() { + v.RebalanceTimeoutMillis = -1 +} + +// NewConsumerGroupHeartbeatRequest returns a default ConsumerGroupHeartbeatRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupHeartbeatRequest() ConsumerGroupHeartbeatRequest { + var v ConsumerGroupHeartbeatRequest + v.Default() + return v +} + +type ConsumerGroupHeartbeatResponseAssignmentTopic struct { + TopicID [16]byte + + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupHeartbeatResponseAssignmentTopic. +func (v *ConsumerGroupHeartbeatResponseAssignmentTopic) Default() { +} + +// NewConsumerGroupHeartbeatResponseAssignmentTopic returns a default ConsumerGroupHeartbeatResponseAssignmentTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupHeartbeatResponseAssignmentTopic() ConsumerGroupHeartbeatResponseAssignmentTopic { + var v ConsumerGroupHeartbeatResponseAssignmentTopic + v.Default() + return v +} + +type ConsumerGroupHeartbeatResponseAssignment struct { + // The topics partitions that can be used immediately. + Topics []ConsumerGroupHeartbeatResponseAssignmentTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupHeartbeatResponseAssignment. +func (v *ConsumerGroupHeartbeatResponseAssignment) Default() { +} + +// NewConsumerGroupHeartbeatResponseAssignment returns a default ConsumerGroupHeartbeatResponseAssignment +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupHeartbeatResponseAssignment() ConsumerGroupHeartbeatResponseAssignment { + var v ConsumerGroupHeartbeatResponseAssignment + v.Default() + return v +} + +// ConsumerGroupHeartbeatResponse is returned from a ConsumerGroupHeartbeatRequest. +type ConsumerGroupHeartbeatResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - FENCED_MEMBER_EPOCH (version 0+) + // - UNSUPPORTED_ASSIGNOR (version 0+) + // - UNRELEASED_INSTANCE_ID (version 0+) + // - GROUP_MAX_SIZE_REACHED (version 0+) + ErrorCode int16 + + // A supplementary message if this errored. + ErrorMessage *string + + // The member ID generated by the coordinator; provided when joining + // with MemberEpoch=0. + MemberID *string + + // The member epoch. + MemberEpoch int32 + + // The heartbeat interval, in milliseconds. + HeartbeatIntervalMillis int32 + + // The assignment; null if not provided. + Assignment *ConsumerGroupHeartbeatResponseAssignment + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupHeartbeatResponse) Key() int16 { return 68 } +func (*ConsumerGroupHeartbeatResponse) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupHeartbeatResponse) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupHeartbeatResponse) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupHeartbeatResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupHeartbeatResponse) Throttle() (int32, bool) { + return v.ThrottleMillis, v.Version >= 0 +} + +func (v *ConsumerGroupHeartbeatResponse) SetThrottle(throttleMillis int32) { + v.ThrottleMillis = throttleMillis +} + +func (v *ConsumerGroupHeartbeatResponse) RequestKind() Request { + return &ConsumerGroupHeartbeatRequest{Version: v.Version} +} + +func (v *ConsumerGroupHeartbeatResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.ErrorMessage + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.MemberID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.MemberEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.HeartbeatIntervalMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Assignment + if v == nil { + dst = append(dst, 255) + } else { + dst = append(dst, 1) + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupHeartbeatResponse) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupHeartbeatResponse) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupHeartbeatResponse) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.ErrorMessage = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.MemberID = v + } + { + v := b.Int32() + s.MemberEpoch = v + } + { + v := b.Int32() + s.HeartbeatIntervalMillis = v + } + { + if present := b.Int8(); present != -1 && b.Ok() { + s.Assignment = new(ConsumerGroupHeartbeatResponseAssignment) + v := s.Assignment + v.Default() + s := v + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupHeartbeatResponseAssignmentTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupHeartbeatResponse returns a pointer to a default ConsumerGroupHeartbeatResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupHeartbeatResponse() *ConsumerGroupHeartbeatResponse { + var v ConsumerGroupHeartbeatResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupHeartbeatResponse. +func (v *ConsumerGroupHeartbeatResponse) Default() { + { + v := &v.Assignment + _ = v + } +} + +// NewConsumerGroupHeartbeatResponse returns a default ConsumerGroupHeartbeatResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupHeartbeatResponse() ConsumerGroupHeartbeatResponse { + var v ConsumerGroupHeartbeatResponse + v.Default() + return v +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -44185,6 +45469,8 @@ func RequestForKey(key int16) Request { return NewPtrListTransactionsRequest() case 67: return NewPtrAllocateProducerIDsRequest() + case 68: + return NewPtrConsumerGroupHeartbeatRequest() } } @@ -44330,6 +45616,8 @@ func ResponseForKey(key int16) Response { return NewPtrListTransactionsResponse() case 67: return NewPtrAllocateProducerIDsResponse() + case 68: + return NewPtrConsumerGroupHeartbeatResponse() } } @@ -44475,6 +45763,8 @@ func NameForKey(key int16) string { return "ListTransactions" case 67: return "AllocateProducerIDs" + case 68: + return "ConsumerGroupHeartbeat" } } @@ -44550,6 +45840,7 @@ const ( DescribeTransactions Key = 65 ListTransactions Key = 66 AllocateProducerIDs Key = 67 + ConsumerGroupHeartbeat Key = 68 ) // Name returns the name for this key.