From 88ce9e96debf598bb43f7cb60736172aaef64ab0 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Oct 2024 19:50:56 -0600 Subject: [PATCH 1/4] kip-890 definitions A bunch of version bumps to indicate TransactionAbortable is supported as an error return. --- generate/definitions/00_produce | 2 +- generate/definitions/10_find_coordinator | 2 +- generate/definitions/22_init_producer_id | 2 +- generate/definitions/24_add_partitions_to_txn | 2 +- generate/definitions/25_add_offsets_to_txn | 2 +- generate/definitions/26_end_txn | 2 +- generate/definitions/28_txn_offset_commit | 2 +- pkg/kmsg/generated.go | 28 +++++++++---------- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/generate/definitions/00_produce b/generate/definitions/00_produce index 4afd2651..4addcd2a 100644 --- a/generate/definitions/00_produce +++ b/generate/definitions/00_produce @@ -6,7 +6,7 @@ // Note that the special client ID "__admin_client" will allow you to produce // records to internal topics. This is generally recommended if you want to // break your Kafka cluster. -ProduceRequest => key 0, max version 10, flexible v9+ +ProduceRequest => key 0, max version 11, flexible v9+ // TransactionID is the transaction ID to use for this request, allowing for // exactly once semantics. TransactionID: nullable-string // v3+ diff --git a/generate/definitions/10_find_coordinator b/generate/definitions/10_find_coordinator index 5c5d8cdd..238f2a87 100644 --- a/generate/definitions/10_find_coordinator +++ b/generate/definitions/10_find_coordinator @@ -3,7 +3,7 @@ // This coordinator is different from the broker leader coordinator. This // coordinator is the partition leader for the partition that is storing // the group or transaction ID. -FindCoordinatorRequest => key 10, max version 4, flexible v3+ +FindCoordinatorRequest => key 10, max version 5, flexible v3+ // CoordinatorKey is the ID to use for finding the coordinator. For groups, // this is the group name, for transactional producer, this is the // transactional ID. diff --git a/generate/definitions/22_init_producer_id b/generate/definitions/22_init_producer_id index 2afbf188..18df2313 100644 --- a/generate/definitions/22_init_producer_id +++ b/generate/definitions/22_init_producer_id @@ -4,7 +4,7 @@ // // Note that you do not need to go to a txn coordinator if you are initializing // a producer id without a transactional id. -InitProducerIDRequest => key 22, max version 4, flexible v2+, txn coordinator +InitProducerIDRequest => key 22, max version 5, flexible v2+, txn coordinator // TransactionalID is the ID to use for transactions if using transactions. TransactionalID: nullable-string // TransactionTimeoutMillis is how long a transaction is allowed before diff --git a/generate/definitions/24_add_partitions_to_txn b/generate/definitions/24_add_partitions_to_txn index dc8b757b..355f2803 100644 --- a/generate/definitions/24_add_partitions_to_txn +++ b/generate/definitions/24_add_partitions_to_txn @@ -8,7 +8,7 @@ // // Version 4 adds VerifyOnly field to check if partitions are already in // transaction and adds support to batch multiple transactions. -AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator +AddPartitionsToTxnRequest => key 24, max version 5, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/25_add_offsets_to_txn b/generate/definitions/25_add_offsets_to_txn index bd52bcac..a7750a3b 100644 --- a/generate/definitions/25_add_offsets_to_txn +++ b/generate/definitions/25_add_offsets_to_txn @@ -6,7 +6,7 @@ // Internally, this request simply adds the __consumer_offsets topic as a // partition for this transaction with AddPartitionsToTxn for the partition // in that topic that contains the group. -AddOffsetsToTxnRequest => key 25, max version 3, flexible v3+, txn coordinator +AddOffsetsToTxnRequest => key 25, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/26_end_txn b/generate/definitions/26_end_txn index 6b569817..d571b03f 100644 --- a/generate/definitions/26_end_txn +++ b/generate/definitions/26_end_txn @@ -1,6 +1,6 @@ // EndTxnRequest ends a transaction. This should be called after // TxnOffsetCommitRequest. -EndTxnRequest => key 26, max version 3, flexible v3+, txn coordinator +EndTxnRequest => key 26, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/28_txn_offset_commit b/generate/definitions/28_txn_offset_commit index 73186821..bbf743dd 100644 --- a/generate/definitions/28_txn_offset_commit +++ b/generate/definitions/28_txn_offset_commit @@ -1,7 +1,7 @@ // TxnOffsetCommitRequest sends offsets that are a part of this transaction // to be committed once the transaction itself finishes. This effectively // replaces OffsetCommitRequest for when using transactions. -TxnOffsetCommitRequest => key 28, max version 3, flexible v3+, group coordinator +TxnOffsetCommitRequest => key 28, max version 4, flexible v3+, group coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // Group is the group consumed in this transaction and to be used for diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 75bff995..583a8dc9 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -2783,7 +2783,7 @@ type ProduceRequest struct { } func (*ProduceRequest) Key() int16 { return 0 } -func (*ProduceRequest) MaxVersion() int16 { return 10 } +func (*ProduceRequest) MaxVersion() int16 { return 11 } func (v *ProduceRequest) SetVersion(version int16) { v.Version = version } func (v *ProduceRequest) GetVersion() int16 { return v.Version } func (v *ProduceRequest) IsFlexible() bool { return v.Version >= 9 } @@ -3306,7 +3306,7 @@ type ProduceResponse struct { } func (*ProduceResponse) Key() int16 { return 0 } -func (*ProduceResponse) MaxVersion() int16 { return 10 } +func (*ProduceResponse) MaxVersion() int16 { return 11 } func (v *ProduceResponse) SetVersion(version int16) { v.Version = version } func (v *ProduceResponse) GetVersion() int16 { return v.Version } func (v *ProduceResponse) IsFlexible() bool { return v.Version >= 9 } @@ -12493,7 +12493,7 @@ type FindCoordinatorRequest struct { } func (*FindCoordinatorRequest) Key() int16 { return 10 } -func (*FindCoordinatorRequest) MaxVersion() int16 { return 4 } +func (*FindCoordinatorRequest) MaxVersion() int16 { return 5 } func (v *FindCoordinatorRequest) SetVersion(version int16) { v.Version = version } func (v *FindCoordinatorRequest) GetVersion() int16 { return v.Version } func (v *FindCoordinatorRequest) IsFlexible() bool { return v.Version >= 3 } @@ -12733,7 +12733,7 @@ type FindCoordinatorResponse struct { } func (*FindCoordinatorResponse) Key() int16 { return 10 } -func (*FindCoordinatorResponse) MaxVersion() int16 { return 4 } +func (*FindCoordinatorResponse) MaxVersion() int16 { return 5 } func (v *FindCoordinatorResponse) SetVersion(version int16) { v.Version = version } func (v *FindCoordinatorResponse) GetVersion() int16 { return v.Version } func (v *FindCoordinatorResponse) IsFlexible() bool { return v.Version >= 3 } @@ -19348,7 +19348,7 @@ type InitProducerIDRequest struct { } func (*InitProducerIDRequest) Key() int16 { return 22 } -func (*InitProducerIDRequest) MaxVersion() int16 { return 4 } +func (*InitProducerIDRequest) MaxVersion() int16 { return 5 } func (v *InitProducerIDRequest) SetVersion(version int16) { v.Version = version } func (v *InitProducerIDRequest) GetVersion() int16 { return v.Version } func (v *InitProducerIDRequest) IsFlexible() bool { return v.Version >= 2 } @@ -19523,7 +19523,7 @@ type InitProducerIDResponse struct { } func (*InitProducerIDResponse) Key() int16 { return 22 } -func (*InitProducerIDResponse) MaxVersion() int16 { return 4 } +func (*InitProducerIDResponse) MaxVersion() int16 { return 5 } func (v *InitProducerIDResponse) SetVersion(version int16) { v.Version = version } func (v *InitProducerIDResponse) GetVersion() int16 { return v.Version } func (v *InitProducerIDResponse) IsFlexible() bool { return v.Version >= 2 } @@ -20367,7 +20367,7 @@ type AddPartitionsToTxnRequest struct { } func (*AddPartitionsToTxnRequest) Key() int16 { return 24 } -func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 4 } +func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 5 } func (v *AddPartitionsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -20953,7 +20953,7 @@ type AddPartitionsToTxnResponse struct { } func (*AddPartitionsToTxnResponse) Key() int16 { return 24 } -func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 4 } +func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 5 } func (v *AddPartitionsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -21388,7 +21388,7 @@ type AddOffsetsToTxnRequest struct { } func (*AddOffsetsToTxnRequest) Key() int16 { return 25 } -func (*AddOffsetsToTxnRequest) MaxVersion() int16 { return 3 } +func (*AddOffsetsToTxnRequest) MaxVersion() int16 { return 4 } func (v *AddOffsetsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddOffsetsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddOffsetsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -21559,7 +21559,7 @@ type AddOffsetsToTxnResponse struct { } func (*AddOffsetsToTxnResponse) Key() int16 { return 25 } -func (*AddOffsetsToTxnResponse) MaxVersion() int16 { return 3 } +func (*AddOffsetsToTxnResponse) MaxVersion() int16 { return 4 } func (v *AddOffsetsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddOffsetsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddOffsetsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -21668,7 +21668,7 @@ type EndTxnRequest struct { } func (*EndTxnRequest) Key() int16 { return 26 } -func (*EndTxnRequest) MaxVersion() int16 { return 3 } +func (*EndTxnRequest) MaxVersion() int16 { return 4 } func (v *EndTxnRequest) SetVersion(version int16) { v.Version = version } func (v *EndTxnRequest) GetVersion() int16 { return v.Version } func (v *EndTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -21832,7 +21832,7 @@ type EndTxnResponse struct { } func (*EndTxnResponse) Key() int16 { return 26 } -func (*EndTxnResponse) MaxVersion() int16 { return 3 } +func (*EndTxnResponse) MaxVersion() int16 { return 4 } func (v *EndTxnResponse) SetVersion(version int16) { v.Version = version } func (v *EndTxnResponse) GetVersion() int16 { return v.Version } func (v *EndTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -22678,7 +22678,7 @@ type TxnOffsetCommitRequest struct { } func (*TxnOffsetCommitRequest) Key() int16 { return 28 } -func (*TxnOffsetCommitRequest) MaxVersion() int16 { return 3 } +func (*TxnOffsetCommitRequest) MaxVersion() int16 { return 4 } func (v *TxnOffsetCommitRequest) SetVersion(version int16) { v.Version = version } func (v *TxnOffsetCommitRequest) GetVersion() int16 { return v.Version } func (v *TxnOffsetCommitRequest) IsFlexible() bool { return v.Version >= 3 } @@ -23143,7 +23143,7 @@ type TxnOffsetCommitResponse struct { } func (*TxnOffsetCommitResponse) Key() int16 { return 28 } -func (*TxnOffsetCommitResponse) MaxVersion() int16 { return 3 } +func (*TxnOffsetCommitResponse) MaxVersion() int16 { return 4 } func (v *TxnOffsetCommitResponse) SetVersion(version int16) { v.Version = version } func (v *TxnOffsetCommitResponse) GetVersion() int16 { return v.Version } func (v *TxnOffsetCommitResponse) IsFlexible() bool { return v.Version >= 3 } From 6014a978938138bf09294b9ced3ec50aa996e2e0 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Oct 2024 19:58:38 -0600 Subject: [PATCH 2/4] kip-848 more definitions Added in Kafka 3.8: * ListGroups.TypesFilter * ConsumerGroupDescribe request --- generate/definitions/16_list_groups | 7 +- .../definitions/69_consumer_group_describe | 67 + pkg/kmsg/generated.go | 1178 ++++++++++++++++- 3 files changed, 1248 insertions(+), 4 deletions(-) create mode 100644 generate/definitions/69_consumer_group_describe diff --git a/generate/definitions/16_list_groups b/generate/definitions/16_list_groups index 02fb6b04..c09453f6 100644 --- a/generate/definitions/16_list_groups +++ b/generate/definitions/16_list_groups @@ -1,12 +1,15 @@ // ListGroupsRequest issues a request to list all groups. // // To list all groups in a cluster, this must be issued to every broker. -ListGroupsRequest => key 16, max version 4, flexible v3+ +ListGroupsRequest => key 16, max version 5, flexible v3+ // StatesFilter, proposed in KIP-518 and introduced in Kafka 2.6.0, // allows filtering groups by state, where a state is any of // "Preparing", "PreparingRebalance", "CompletingRebalance", "Stable", // "Dead", or "Empty". If empty, all groups are returned. StatesFilter: [string] // v4+ + // TypesFilter, part of KIP-848, filters the types of groups we want + // to list. If empty, all groups are returned. + TypesFilter: [string] // v5+ // ListGroupsResponse is returned from a ListGroupsRequest. ListGroupsResponse => @@ -25,3 +28,5 @@ ListGroupsResponse => ProtocolType: string // The group state. GroupState: string // v4+ + // The group type. + GroupType: string // v5+ diff --git a/generate/definitions/69_consumer_group_describe b/generate/definitions/69_consumer_group_describe new file mode 100644 index 00000000..463863dd --- /dev/null +++ b/generate/definitions/69_consumer_group_describe @@ -0,0 +1,67 @@ +// Assignment contains consumer group assignments. +Assignment => not top level, no encoding, flexible v0+ + // The topics & partitions assigned to the member. + TopicPartitions: [=>] + TopicID: uuid + Topic: string + Partitions: [int32] + +// ConsumerGroupDescribe is a part of KIP-848; this is the +// "next generation" equivalent of DescribeGroups. +ConsumerGroupDescribeRequest => key 69, max version 0, flexible v0+ + // The IDs of the groups to describe. + Groups: [string] + // Whether to include authorized operations. + IncludeAuthorizedOperations: bool + +// ConsumerGroupDescribeResponse is returned from a ConsumerGroupDescribeRequest. +ConsumerGroupDescribeResponse => + ThrottleMillis + Groups: [=>] + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + ErrorCode: int16 + // A supplementary message if this errored. + ErrorMessage: nullable-string + // The group ID. + Group: string + // The group state. + State: string + // The group epoch. + Epoch: int32 + // The assignment epoch. + AssignmentEpoch: int32 + // The selected assignor. + AssignorName: string + // Members of the group. + Members: [=>] + // The member ID. + MemberID: string + // The member instance ID, if any. + InstanceID: nullable-string + // The member rack ID, if any. + RackID: nullable-string + // The current member epoch. + MemberEpoch: int32 + // The client ID. + ClientID: string + // The client host. + ClientHost: string + // The subscribed topic names. + SubscribedTopics: [string] + // The subscribed topic regex, if any. + SubscribedTopicRegex: nullable-string + // The current assignment. + Assignment: Assignment + // The target assignment. + TargetAssignment: Assignment + // 32 bit bitfield representing authorized operations for the group. + AuthorizedOperations: int32(-2147483648) diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 583a8dc9..70ec0b00 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -13,7 +13,31 @@ import ( // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 68 +const MaxKey = 69 + +type AssignmentTopicPartition struct { + TopicID [16]byte + + Topic string + + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AssignmentTopicPartition. +func (v *AssignmentTopicPartition) Default() { +} + +// NewAssignmentTopicPartition returns a default AssignmentTopicPartition +// This is a shortcut for creating a struct and calling Default yourself. +func NewAssignmentTopicPartition() AssignmentTopicPartition { + var v AssignmentTopicPartition + v.Default() + return v +} // MessageV0 is the message format Kafka used prior to 0.10. // @@ -15960,12 +15984,16 @@ type ListGroupsRequest struct { // "Dead", or "Empty". If empty, all groups are returned. StatesFilter []string // v4+ + // TypesFilter, part of KIP-848, filters the types of groups we want + // to list. If empty, all groups are returned. + TypesFilter []string // v5+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*ListGroupsRequest) Key() int16 { return 16 } -func (*ListGroupsRequest) MaxVersion() int16 { return 4 } +func (*ListGroupsRequest) MaxVersion() int16 { return 5 } func (v *ListGroupsRequest) SetVersion(version int16) { v.Version = version } func (v *ListGroupsRequest) GetVersion() int16 { return v.Version } func (v *ListGroupsRequest) IsFlexible() bool { return v.Version >= 3 } @@ -16005,6 +16033,22 @@ func (v *ListGroupsRequest) AppendTo(dst []byte) []byte { } } } + if version >= 5 { + v := v.TypesFilter + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -16064,6 +16108,42 @@ func (v *ListGroupsRequest) readFrom(src []byte, unsafe bool) error { v = a s.StatesFilter = v } + if version >= 5 { + v := s.TypesFilter + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.TypesFilter = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -16101,6 +16181,9 @@ type ListGroupsResponseGroup struct { // The group state. GroupState string // v4+ + // The group type. + GroupType string // v5+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } @@ -16146,7 +16229,7 @@ type ListGroupsResponse struct { } func (*ListGroupsResponse) Key() int16 { return 16 } -func (*ListGroupsResponse) MaxVersion() int16 { return 4 } +func (*ListGroupsResponse) MaxVersion() int16 { return 5 } func (v *ListGroupsResponse) SetVersion(version int16) { v.Version = version } func (v *ListGroupsResponse) GetVersion() int16 { return v.Version } func (v *ListGroupsResponse) IsFlexible() bool { return v.Version >= 3 } @@ -16200,6 +16283,14 @@ func (v *ListGroupsResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 5 { + v := v.GroupType + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -16308,6 +16399,23 @@ func (v *ListGroupsResponse) readFrom(src []byte, unsafe bool) error { } s.GroupState = v } + if version >= 5 { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.GroupType = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -45327,6 +45435,1063 @@ func NewConsumerGroupHeartbeatResponse() ConsumerGroupHeartbeatResponse { return v } +// Assignment contains consumer group assignments. +type Assignment struct { + // The topics & partitions assigned to the member. + TopicPartitions []AssignmentTopicPartition + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to Assignment. +func (v *Assignment) Default() { +} + +// NewAssignment returns a default Assignment +// This is a shortcut for creating a struct and calling Default yourself. +func NewAssignment() Assignment { + var v Assignment + v.Default() + return v +} + +// ConsumerGroupDescribe is a part of KIP-848; this is the +// "next generation" equivalent of DescribeGroups. +type ConsumerGroupDescribeRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The IDs of the groups to describe. + Groups []string + + // Whether to include authorized operations. + IncludeAuthorizedOperations bool + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupDescribeRequest) Key() int16 { return 69 } +func (*ConsumerGroupDescribeRequest) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupDescribeRequest) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupDescribeRequest) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupDescribeRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupDescribeRequest) ResponseKind() Response { + r := &ConsumerGroupDescribeResponse{Version: v.Version} + r.Default() + return r +} + +// RequestWith is requests v on r and returns the response or an error. +// For sharded requests, the response may be merged and still return an error. +// It is better to rely on client.RequestSharded than to rely on proper merging behavior. +func (v *ConsumerGroupDescribeRequest) RequestWith(ctx context.Context, r Requestor) (*ConsumerGroupDescribeResponse, error) { + kresp, err := r.Request(ctx, v) + resp, _ := kresp.(*ConsumerGroupDescribeResponse) + return resp, err +} + +func (v *ConsumerGroupDescribeRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.Groups + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } + { + v := v.IncludeAuthorizedOperations + dst = kbin.AppendBool(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupDescribeRequest) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupDescribeRequest) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupDescribeRequest) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := s.Groups + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.Groups = v + } + { + v := b.Bool() + s.IncludeAuthorizedOperations = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupDescribeRequest returns a pointer to a default ConsumerGroupDescribeRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupDescribeRequest() *ConsumerGroupDescribeRequest { + var v ConsumerGroupDescribeRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeRequest. +func (v *ConsumerGroupDescribeRequest) Default() { +} + +// NewConsumerGroupDescribeRequest returns a default ConsumerGroupDescribeRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeRequest() ConsumerGroupDescribeRequest { + var v ConsumerGroupDescribeRequest + v.Default() + return v +} + +type ConsumerGroupDescribeResponseGroupMember struct { + // The member ID. + MemberID string + + // The member instance ID, if any. + InstanceID *string + + // The member rack ID, if any. + RackID *string + + // The current member epoch. + MemberEpoch int32 + + // The client ID. + ClientID string + + // The client host. + ClientHost string + + // The subscribed topic names. + SubscribedTopics []string + + // The subscribed topic regex, if any. + SubscribedTopicRegex *string + + // The current assignment. + Assignment Assignment + + // The target assignment. + TargetAssignment Assignment + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponseGroupMember. +func (v *ConsumerGroupDescribeResponseGroupMember) Default() { + { + v := &v.Assignment + _ = v + } + { + v := &v.TargetAssignment + _ = v + } +} + +// NewConsumerGroupDescribeResponseGroupMember returns a default ConsumerGroupDescribeResponseGroupMember +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponseGroupMember() ConsumerGroupDescribeResponseGroupMember { + var v ConsumerGroupDescribeResponseGroupMember + v.Default() + return v +} + +type ConsumerGroupDescribeResponseGroup struct { + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + ErrorCode int16 + + // A supplementary message if this errored. + ErrorMessage *string + + // The group ID. + Group string + + // The group state. + State string + + // The group epoch. + Epoch int32 + + // The assignment epoch. + AssignmentEpoch int32 + + // The selected assignor. + AssignorName string + + // Members of the group. + Members []ConsumerGroupDescribeResponseGroupMember + + // 32 bit bitfield representing authorized operations for the group. + // + // This field has a default of -2147483648. + AuthorizedOperations int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponseGroup. +func (v *ConsumerGroupDescribeResponseGroup) Default() { + v.AuthorizedOperations = -2147483648 +} + +// NewConsumerGroupDescribeResponseGroup returns a default ConsumerGroupDescribeResponseGroup +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponseGroup() ConsumerGroupDescribeResponseGroup { + var v ConsumerGroupDescribeResponseGroup + v.Default() + return v +} + +// ConsumerGroupDescribeResponse is returned from a ConsumerGroupDescribeRequest. +type ConsumerGroupDescribeResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + Groups []ConsumerGroupDescribeResponseGroup + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupDescribeResponse) Key() int16 { return 69 } +func (*ConsumerGroupDescribeResponse) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupDescribeResponse) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupDescribeResponse) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupDescribeResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupDescribeResponse) Throttle() (int32, bool) { + return v.ThrottleMillis, v.Version >= 0 +} + +func (v *ConsumerGroupDescribeResponse) SetThrottle(throttleMillis int32) { + v.ThrottleMillis = throttleMillis +} + +func (v *ConsumerGroupDescribeResponse) RequestKind() Request { + return &ConsumerGroupDescribeRequest{Version: v.Version} +} + +func (v *ConsumerGroupDescribeResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Groups + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.ErrorMessage + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.Group + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.State + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Epoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.AssignmentEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.AssignorName + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Members + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.MemberID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.InstanceID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.RackID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.MemberEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ClientID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.ClientHost + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.SubscribedTopics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } + { + v := v.SubscribedTopicRegex + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := &v.Assignment + { + v := v.TopicPartitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + { + v := &v.TargetAssignment + { + v := v.TopicPartitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + { + v := v.AuthorizedOperations + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupDescribeResponse) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupDescribeResponse) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupDescribeResponse) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := s.Groups + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupDescribeResponseGroup, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.ErrorMessage = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Group = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.State = v + } + { + v := b.Int32() + s.Epoch = v + } + { + v := b.Int32() + s.AssignmentEpoch = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.AssignorName = v + } + { + v := s.Members + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupDescribeResponseGroupMember, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.MemberID = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.InstanceID = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.RackID = v + } + { + v := b.Int32() + s.MemberEpoch = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.ClientID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.ClientHost = v + } + { + v := s.SubscribedTopics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.SubscribedTopics = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.SubscribedTopicRegex = v + } + { + v := &s.Assignment + v.Default() + s := v + { + v := s.TopicPartitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AssignmentTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.TopicPartitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + { + v := &s.TargetAssignment + v.Default() + s := v + { + v := s.TopicPartitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AssignmentTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.TopicPartitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Members = v + } + { + v := b.Int32() + s.AuthorizedOperations = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Groups = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupDescribeResponse returns a pointer to a default ConsumerGroupDescribeResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupDescribeResponse() *ConsumerGroupDescribeResponse { + var v ConsumerGroupDescribeResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponse. +func (v *ConsumerGroupDescribeResponse) Default() { +} + +// NewConsumerGroupDescribeResponse returns a default ConsumerGroupDescribeResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponse() ConsumerGroupDescribeResponse { + var v ConsumerGroupDescribeResponse + v.Default() + return v +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -45471,6 +46636,8 @@ func RequestForKey(key int16) Request { return NewPtrAllocateProducerIDsRequest() case 68: return NewPtrConsumerGroupHeartbeatRequest() + case 69: + return NewPtrConsumerGroupDescribeRequest() } } @@ -45618,6 +46785,8 @@ func ResponseForKey(key int16) Response { return NewPtrAllocateProducerIDsResponse() case 68: return NewPtrConsumerGroupHeartbeatResponse() + case 69: + return NewPtrConsumerGroupDescribeResponse() } } @@ -45765,6 +46934,8 @@ func NameForKey(key int16) string { return "AllocateProducerIDs" case 68: return "ConsumerGroupHeartbeat" + case 69: + return "ConsumerGroupDescribe" } } @@ -45841,6 +47012,7 @@ const ( ListTransactions Key = 66 AllocateProducerIDs Key = 67 ConsumerGroupHeartbeat Key = 68 + ConsumerGroupDescribe Key = 69 ) // Name returns the name for this key. From e7b9d3c915e9e6744d1e32c166528e7e2ca67210 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Oct 2024 20:26:05 -0600 Subject: [PATCH 3/4] kip-994 proto Only ListTransactions was modified in 3.8 --- generate/definitions/66_list_transactions | 6 +++++- pkg/kmsg/generated.go | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/generate/definitions/66_list_transactions b/generate/definitions/66_list_transactions index 216ac02c..bf04ac93 100644 --- a/generate/definitions/66_list_transactions +++ b/generate/definitions/66_list_transactions @@ -8,8 +8,12 @@ ListTransactionsRequest => key 66, max version 0, flexible v0+ StateFilters: [string] // The producer IDs to filter by: if empty, all transactions will be // returned; if non-empty, only transactions which match one of the filtered - // producer IDs will be returned + // producer IDs will be returned. ProducerIDFilters: [int64] + // Duration (in millis) to filter by: if < 0, all transactions will be + // returned; otherwise, only transactions running longer than this duration + // will be returned. + DurationFilterMillis: int64(-1) // v1+ // ListTransactionsResponse is a response to a ListTransactionsRequest. ListTransactionsResponse => diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 70ec0b00..16350a4a 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -43955,9 +43955,16 @@ type ListTransactionsRequest struct { // The producer IDs to filter by: if empty, all transactions will be // returned; if non-empty, only transactions which match one of the filtered - // producer IDs will be returned + // producer IDs will be returned. ProducerIDFilters []int64 + // Duration (in millis) to filter by: if < 0, all transactions will be + // returned; otherwise, only transactions running longer than this duration + // will be returned. + // + // This field has a default of -1. + DurationFilterMillis int64 // v1+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags } @@ -44015,6 +44022,10 @@ func (v *ListTransactionsRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendInt64(dst, v) } } + if version >= 1 { + v := v.DurationFilterMillis + dst = kbin.AppendInt64(dst, v) + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -44097,6 +44108,10 @@ func (v *ListTransactionsRequest) readFrom(src []byte, unsafe bool) error { v = a s.ProducerIDFilters = v } + if version >= 1 { + v := b.Int64() + s.DurationFilterMillis = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -44114,6 +44129,7 @@ func NewPtrListTransactionsRequest() *ListTransactionsRequest { // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to ListTransactionsRequest. func (v *ListTransactionsRequest) Default() { + v.DurationFilterMillis = -1 } // NewListTransactionsRequest returns a default ListTransactionsRequest From 0d24bb59a8553008f6179b669d97910f9ded5163 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Oct 2024 09:35:06 -0600 Subject: [PATCH 4/4] generate / kmsg: update GroupMetadata{Key,Value} Not much changed here. Closes #804. --- generate/definitions/misc | 6 +- pkg/kmsg/generated.go | 201 ++++++++++++++++++++++++++++++++------ 2 files changed, 174 insertions(+), 33 deletions(-) diff --git a/generate/definitions/misc b/generate/definitions/misc index 8166cee2..7c3c5cd8 100644 --- a/generate/definitions/misc +++ b/generate/definitions/misc @@ -272,7 +272,7 @@ GroupMetadataKey => not top level, with version field // // KAFKA-7862 commit 0f995ba6be, proposed in KIP-345 and included in 2.3.0 // released version 3. -GroupMetadataValue => not top level, with version field +GroupMetadataValue => not top level, with version field, flexible v4+ // Version is the version of this value. Version: int16 // ProtocolType is the type of protocol being used for the group @@ -287,7 +287,7 @@ GroupMetadataValue => not top level, with version field Leader: nullable-string // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). - CurrentStateTimestamp: int64 // v2+ + CurrentStateTimestamp: int64(-1) // v2+ // Members are the group members. Members: [=>] // MemberID is a group member. @@ -299,7 +299,7 @@ GroupMetadataValue => not top level, with version field // ClientHost is the hostname of this group member. ClientHost: string // RebalanceTimeoutMillis is the rebalance timeout of this group member. - RebalanceTimeoutMillis: int32 // v1+ + RebalanceTimeoutMillis: int32(-1) // v1+ // SessionTimeoutMillis is the session timeout of this group member. SessionTimeoutMillis: int32 // Subscription is the subscription of this group member. diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 16350a4a..584f6f37 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -922,6 +922,8 @@ type GroupMetadataValueMember struct { ClientHost string // RebalanceTimeoutMillis is the rebalance timeout of this group member. + // + // This field has a default of -1. RebalanceTimeoutMillis int32 // v1+ // SessionTimeoutMillis is the session timeout of this group member. @@ -932,11 +934,15 @@ type GroupMetadataValueMember struct { // Assignment is what the leader assigned this group member. Assignment []byte + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValueMember. func (v *GroupMetadataValueMember) Default() { + v.RebalanceTimeoutMillis = -1 } // NewGroupMetadataValueMember returns a default GroupMetadataValueMember @@ -980,22 +986,33 @@ type GroupMetadataValue struct { // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). + // + // This field has a default of -1. CurrentStateTimestamp int64 // v2+ // Members are the group members. Members []GroupMetadataValueMember + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible { v := v.Version dst = kbin.AppendInt16(dst, v) } { v := v.ProtocolType - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.Generation @@ -1003,11 +1020,19 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Protocol - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.Leader - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } if version >= 2 { v := v.CurrentStateTimestamp @@ -1015,24 +1040,44 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Members - dst = kbin.AppendArrayLen(dst, len(v)) + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } for i := range v { v := &v[i] { v := v.MemberID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 3 { v := v.InstanceID - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.ClientID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.ClientHost - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 1 { v := v.RebalanceTimeoutMillis @@ -1044,14 +1089,30 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Subscription - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } } { v := v.Assignment - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) } } } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } return dst } @@ -1069,13 +1130,23 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v.Version = b.Int16() version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible s := v { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ProtocolType = v } @@ -1085,19 +1156,35 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Protocol = v } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Leader = v } @@ -1109,7 +1196,11 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v := s.Members a := v var l int32 - l = b.ArrayLen() + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } if !b.Ok() { return b.Complete() } @@ -1124,36 +1215,68 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.MemberID = v } if version >= 3 { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.InstanceID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientHost = v } @@ -1166,23 +1289,41 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { s.SessionTimeoutMillis = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Subscription = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Assignment = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } } v = a s.Members = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } return b.Complete() } +func (v *GroupMetadataValue) IsFlexible() bool { return v.Version >= 4 } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValue. func (v *GroupMetadataValue) Default() { + v.CurrentStateTimestamp = -1 } // NewGroupMetadataValue returns a default GroupMetadataValue