From 2e98bd6f93145ed253ed91e57c50231695a726b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Thu, 17 Oct 2024 09:58:32 +0200 Subject: [PATCH] cherry-pick latest upstream changes (#10) * Fix typo in kgo.Client.ResumeFetchTopics() docs Signed-off-by: Mihai Todor * add `NewOffsetFromRecord` helper function * Fix typo in Record.ProducerID doc comment. * Don't set nil config when seeding topics in kfake cluster Setting the configs to `nil` causes it to panic later when trying to alter the topic configs, as it only checks for entry in the map not being present, not for it being nil Signed-off-by: Oleg Zaytsev * Add Opts method for sr.Client * Merge pull request #826 from colega/don-t-set-nil-config-when-seeding-topics-in-kfake-cluster Don't set nil config when seeding topics in kfake cluster * Merge pull request #821 from seizethedave/davidgrant/producer-doc Fix typo in Record.ProducerID doc comment. * Merge pull request #812 from mihaitodor/fix-doc-typo Fix typo in kgo.Client.ResumeFetchTopics() docs * kgo: fix potential deadlock when reaching max buffered (records|bytes) Problem: * Record A exceeds max, is on path to block * Record B finishes concurrently * Record A's context cancels * Record A's goroutine waiting to be unblocked returns, leaves accounting mutex in locked state * Record A's select statement chooses context-canceled case, trying to grab the accounting mutex lock See #831 for more details. Closes #831. * all: unlint what is now cropping up gosec ones are delibate; govet ones are now randomly showing up (and also deliberate) * Merge pull request #832 from twmb/831 kgo: fix potential deadlock when reaching max buffered (records|bytes) * kgo: misc doc update * kgo: ignore OOOSN where possible See embedded comment. This preceeds handling KIP-890. Closes #805. * kip-890 definitions A bunch of version bumps to indicate TransactionAbortable is supported as an error return. * kip-848 more definitions Added in Kafka 3.8: * ListGroups.TypesFilter * ConsumerGroupDescribe request * kip-994 proto Only ListTransactions was modified in 3.8 * sr: add StatusCode to ResponseError, and message if the body is empty Closes #819. * generate / kmsg: update GroupMetadata{Key,Value} Not much changed here. Closes #804. * kgo: do not add all topics to internal tps map when regex consuming The internal tps map is meant to be what we store topicPartitions in that we are candidates to be consumed. This is filtered in assignPartitions to only opt-in partitions that are actually being consumed. It's not BAD if we store all topics in that map, but it's not the intent. The rest of the client worked fine even with extra topics in the map. When regex consuming, the metadata function previously put all topics into the map always. Now, we move the regex evaluation logic -- duplicated in both the direct and group consumers -- into one function and use that for filtering within metadata. This introduces a required sequence of filtering THEN finding assignments, which is fine / was the way things operated anyway. Moving the filtering to metadata (only in the regex consuming logic) means that we no longer store information for topics we are not consuming. Indirectly, this fixes a bug where `GetConsumeTopics` would always return ALL topics when regex consuming, because `GetConsumeTopics` always just returned what was in the `tps` field. This adds a test for the fixed behavior, as well as tests that NOT regex consuming always returns all topics the user is interested in. Closes #810. * Merge pull request #833 from twmb/proto-3.8.0 Proto 3.8.0 * kgo: support Kafka 3.8's kip-890 modifications STILL NOT ALL OF KIP-890, despite what I originally coded. Kafka 3.8 only added support for TransactionAbortable. Producers still need to send AddPartitionsToTxn. * kversion: note kip-848 additions for kafka 3.8 * kversion: note kip-994 added in 3.8, finalize 3.8 * kversion: ignore API keys 74,75 when guessing versions These are in Kraft only, and are two requests from two separate KIPs that aren't fully supported yet. Not sure why only these two were stabilized. * README: note 3.8 KIPs * kgo: bump kmsg pinned dep * Merge pull request #840 from twmb/kafka-3.8.0 Kafka 3.8.0 * Merge pull request #760 from twmb/753 kgo: add AllowRebalance and CloseAllowingRebalance to GroupTransactSession * Merge pull request #789 from sbuliarca/errgroupsession-export-err kgo: export the wrapped error from ErrGroupSession * Merge pull request #794 from twmb/790 kgo: add TopicID to the FetchTopic type * Merge pull request #814 from noamcohen97/new-offset-helper kadm: add `NewOffsetFromRecord` helper function * Merge pull request #829 from andrewstucki/sr-client-opts Add Opts method for sr.Client * Merge pull request #834 from twmb/805 kgo: ignore OOOSN where possible * Merge pull request #835 from twmb/819 sr: add StatusCode to ResponseError, and message if the body is empty * Merge pull request #838 from twmb/810 kgo: do not add all topics to internal tps map when regex consuming * CHANGELOG: note incoming release * Merge pull request #841 from twmb/1.18-changelog CHANGELOG: note incoming release * pkg/sr: require go 1.22 No real reason, no real reason not to. This also allows one commit after the top level franz tag. * Merge pull request #842 from twmb/sr-1.22 pkg/sr: require go 1.22 * pkg/kadm: bump go deps * Merge pull request #843 from twmb/kadm pkg/kadm: bump go deps --------- Signed-off-by: Mihai Todor Signed-off-by: Oleg Zaytsev Co-authored-by: Mihai Todor Co-authored-by: Noam Cohen Co-authored-by: David Grant Co-authored-by: Oleg Zaytsev Co-authored-by: Andrew Stucki Co-authored-by: Travis Bischel --- .golangci.yml | 3 +- CHANGELOG.md | 55 + README.md | 4 +- generate/definitions/00_produce | 2 +- generate/definitions/10_find_coordinator | 2 +- generate/definitions/16_list_groups | 7 +- generate/definitions/22_init_producer_id | 2 +- generate/definitions/24_add_partitions_to_txn | 2 +- generate/definitions/25_add_offsets_to_txn | 2 +- generate/definitions/26_end_txn | 2 +- generate/definitions/28_txn_offset_commit | 2 +- generate/definitions/66_list_transactions | 6 +- .../definitions/69_consumer_group_describe | 67 + generate/definitions/misc | 6 +- generate/gen.go | 6 +- go.mod | 2 +- go.sum | 4 +- pkg/kadm/go.mod | 8 +- pkg/kadm/go.sum | 16 +- pkg/kadm/kadm.go | 14 +- pkg/kerr/kerr.go | 20 + pkg/kfake/data.go | 4 +- pkg/kgo/config.go | 6 +- pkg/kgo/consumer.go | 40 +- pkg/kgo/consumer_direct.go | 20 +- pkg/kgo/consumer_direct_test.go | 69 + pkg/kgo/consumer_group.go | 23 +- pkg/kgo/errors.go | 6 +- pkg/kgo/metadata.go | 9 + pkg/kgo/produce_request_test.go | 37 + pkg/kgo/producer.go | 22 +- pkg/kgo/record_and_fetch.go | 10 +- pkg/kgo/sink.go | 44 +- pkg/kgo/source.go | 1 + pkg/kgo/txn.go | 27 +- pkg/kmsg/generated.go | 1425 ++++++++++++++++- pkg/kversion/kversion.go | 36 +- pkg/sr/client.go | 21 +- pkg/sr/go.mod | 2 +- 39 files changed, 1883 insertions(+), 151 deletions(-) create mode 100644 generate/definitions/69_consumer_group_describe diff --git a/.golangci.yml b/.golangci.yml index 2895cca6..193366c2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,9 +24,9 @@ linters: - asciicheck - bidichk - bodyclose + - copyloopvar - durationcheck - exhaustive - - exportloopref - gocritic - gofmt - gofumpt @@ -74,6 +74,7 @@ linters-settings: excludes: - G104 # unhandled errors, we exclude for the same reason we do not use errcheck - G404 # we want math/rand + - G115 # irrelevant flags in this repo # Gocritic is a meta linter that has very good lints, and most of the # experimental ones are very good too. We opt into everything, which helps diff --git a/CHANGELOG.md b/CHANGELOG.md index 77fd3908..3bd3cbca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,58 @@ +v1.18.0 +=== + +This release adds support for Kafka 3.7, adds a few community requested APIs, +some internal improvements, and fixes two bugs. One of the bugfixes is for a +deadlock; it is recommended to bump to this release to ensure you do not run +into the deadlock. The features in this release are relatively small. + +This adds protocol support for [KIP-890][KIP-890] and [KIP-994][KIP-994], and +adds further protocol support for [KIP-848][KIP-848]. If you are using +transactions, you may see a new `kerr.TransactionAbortable` error, which +signals that your ongoing transaction should be aborted and will not be +successful if you try to commit it. + +[KIP-890]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense +[KIP-994]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-994%3A+Minor+Enhancements+to+ListTransactions+and+DescribeTransactions+APIs + +Lastly, there have been a few improvements to `pkg/sr` that are not mentioned +in these changelog notes. + +## Bug fixes + +* If you canceled the context used while producing while your client was + at the maximum buffered records or bytes, it was possible to experience + deadlocks. This has been fixed. See #832 for more details. + +* Previously, if using `GetConsumeTopics` while regex consuming, the function + would return all topics ever discovered. It now returns only the topics that + are being consumed. + +## Improvements + +* The client now internaly ignores OutOfOrderSequenceNumber errors that are + encountered when consuming _if possible_. If a producer produces very infrequently, + it is possible the broker forgets the producer by the next time the producer + produces. In this case, the producer receives an OutOfOrderSequenceNumber error. + The client now internally resets properly so that you do not see the error. + +## Features + +* `AllowRebalance` and `CloseAllowingRebalance` have been added to `GroupTransactSession`. +* The `FetchTopic` type now has includes the topic's `TopicID`. +* The `ErrGroupSession` internal error field is now public, allowing you to test how you handle the internal error. +* You may now receive a `kerr.TransactionAbortable` error from many functions while using transactions. + +## Relevant commits + +* [`0fd1959d`](https://github.com/twmb/franz-go/commit/0fd1959d) kgo: support Kafka 3.8's kip-890 modifications +* [`68163c55`](https://github.com/twmb/franz-go/commit/68163c55) **bugfix** kgo: do not add all topics to internal tps map when regex consuming +* [`3548d1f7`](https://github.com/twmb/franz-go/commit/3548d1f7) **improvement** kgo: ignore OOOSN where possible +* [`6a759401`](https://github.com/twmb/franz-go/commit/6a759401) **bugfix** kgo: fix potential deadlock when reaching max buffered (records|bytes) +* [`4bfb0c68`](https://github.com/twmb/franz-go/commit/4bfb0c68) **feature** kgo: add TopicID to the FetchTopic type +* [`06a9c47d`](https://github.com/twmb/franz-go/commit/06a9c47d) **feature** kgo: export the wrapped error from ErrGroupSession +* [`4affe8ef`](https://github.com/twmb/franz-go/commit/4affe8ef) **feature** kgo: add AllowRebalance and CloseAllowingRebalance to GroupTransactSession + v1.17.1 === diff --git a/README.md b/README.md index 76cbfb01..2a0d01bb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ This library attempts to provide an intuitive API while interacting with Kafka t ## Features -- Feature complete client (Kafka >= 0.8.0 through v3.4+) +- Feature complete client (Kafka >= 0.8.0 through v3.8+) _minus_ the next generation group protocol - Full Exactly-Once-Semantics (EOS) - Idempotent & transactional producers - Simple (legacy) consumer @@ -403,11 +403,13 @@ generation. | [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) — `AlterPartition.TopicID` | 3.3 | Supported | | [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) — Next gen consumer rebalance protocol | 3.7 | Unsupported (proto supported) | | [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported | +| [KIP-890](https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense) — Transactions server side defense | 3.8 (partial) | Supported | | [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported | | [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) | | [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR) — Stale broker epoch fencing | 3.5 | Supported (proto) | | [KIP-919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) — Admin client talk to KRaft , Controller registration | 3.7 | Supported (proto) | | [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) — Leader discovery optimizations | 3.7 | Supported | +| [KIP-994](https://cwiki.apache.org/confluence/display/KAFKA/KIP-994%3A+Minor+Enhancements+to+ListTransactions+and+DescribeTransactions+APIs) — List/Describe transactions enhancements | 3.8 (partial) | Supported | Missing from above but included in librdkafka is: diff --git a/generate/definitions/00_produce b/generate/definitions/00_produce index 4afd2651..4addcd2a 100644 --- a/generate/definitions/00_produce +++ b/generate/definitions/00_produce @@ -6,7 +6,7 @@ // Note that the special client ID "__admin_client" will allow you to produce // records to internal topics. This is generally recommended if you want to // break your Kafka cluster. -ProduceRequest => key 0, max version 10, flexible v9+ +ProduceRequest => key 0, max version 11, flexible v9+ // TransactionID is the transaction ID to use for this request, allowing for // exactly once semantics. TransactionID: nullable-string // v3+ diff --git a/generate/definitions/10_find_coordinator b/generate/definitions/10_find_coordinator index 5c5d8cdd..238f2a87 100644 --- a/generate/definitions/10_find_coordinator +++ b/generate/definitions/10_find_coordinator @@ -3,7 +3,7 @@ // This coordinator is different from the broker leader coordinator. This // coordinator is the partition leader for the partition that is storing // the group or transaction ID. -FindCoordinatorRequest => key 10, max version 4, flexible v3+ +FindCoordinatorRequest => key 10, max version 5, flexible v3+ // CoordinatorKey is the ID to use for finding the coordinator. For groups, // this is the group name, for transactional producer, this is the // transactional ID. diff --git a/generate/definitions/16_list_groups b/generate/definitions/16_list_groups index 02fb6b04..c09453f6 100644 --- a/generate/definitions/16_list_groups +++ b/generate/definitions/16_list_groups @@ -1,12 +1,15 @@ // ListGroupsRequest issues a request to list all groups. // // To list all groups in a cluster, this must be issued to every broker. -ListGroupsRequest => key 16, max version 4, flexible v3+ +ListGroupsRequest => key 16, max version 5, flexible v3+ // StatesFilter, proposed in KIP-518 and introduced in Kafka 2.6.0, // allows filtering groups by state, where a state is any of // "Preparing", "PreparingRebalance", "CompletingRebalance", "Stable", // "Dead", or "Empty". If empty, all groups are returned. StatesFilter: [string] // v4+ + // TypesFilter, part of KIP-848, filters the types of groups we want + // to list. If empty, all groups are returned. + TypesFilter: [string] // v5+ // ListGroupsResponse is returned from a ListGroupsRequest. ListGroupsResponse => @@ -25,3 +28,5 @@ ListGroupsResponse => ProtocolType: string // The group state. GroupState: string // v4+ + // The group type. + GroupType: string // v5+ diff --git a/generate/definitions/22_init_producer_id b/generate/definitions/22_init_producer_id index 2afbf188..18df2313 100644 --- a/generate/definitions/22_init_producer_id +++ b/generate/definitions/22_init_producer_id @@ -4,7 +4,7 @@ // // Note that you do not need to go to a txn coordinator if you are initializing // a producer id without a transactional id. -InitProducerIDRequest => key 22, max version 4, flexible v2+, txn coordinator +InitProducerIDRequest => key 22, max version 5, flexible v2+, txn coordinator // TransactionalID is the ID to use for transactions if using transactions. TransactionalID: nullable-string // TransactionTimeoutMillis is how long a transaction is allowed before diff --git a/generate/definitions/24_add_partitions_to_txn b/generate/definitions/24_add_partitions_to_txn index dc8b757b..355f2803 100644 --- a/generate/definitions/24_add_partitions_to_txn +++ b/generate/definitions/24_add_partitions_to_txn @@ -8,7 +8,7 @@ // // Version 4 adds VerifyOnly field to check if partitions are already in // transaction and adds support to batch multiple transactions. -AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator +AddPartitionsToTxnRequest => key 24, max version 5, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/25_add_offsets_to_txn b/generate/definitions/25_add_offsets_to_txn index bd52bcac..a7750a3b 100644 --- a/generate/definitions/25_add_offsets_to_txn +++ b/generate/definitions/25_add_offsets_to_txn @@ -6,7 +6,7 @@ // Internally, this request simply adds the __consumer_offsets topic as a // partition for this transaction with AddPartitionsToTxn for the partition // in that topic that contains the group. -AddOffsetsToTxnRequest => key 25, max version 3, flexible v3+, txn coordinator +AddOffsetsToTxnRequest => key 25, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/26_end_txn b/generate/definitions/26_end_txn index 6b569817..d571b03f 100644 --- a/generate/definitions/26_end_txn +++ b/generate/definitions/26_end_txn @@ -1,6 +1,6 @@ // EndTxnRequest ends a transaction. This should be called after // TxnOffsetCommitRequest. -EndTxnRequest => key 26, max version 3, flexible v3+, txn coordinator +EndTxnRequest => key 26, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // ProducerID is the producer ID of the client for this transactional ID diff --git a/generate/definitions/28_txn_offset_commit b/generate/definitions/28_txn_offset_commit index 73186821..bbf743dd 100644 --- a/generate/definitions/28_txn_offset_commit +++ b/generate/definitions/28_txn_offset_commit @@ -1,7 +1,7 @@ // TxnOffsetCommitRequest sends offsets that are a part of this transaction // to be committed once the transaction itself finishes. This effectively // replaces OffsetCommitRequest for when using transactions. -TxnOffsetCommitRequest => key 28, max version 3, flexible v3+, group coordinator +TxnOffsetCommitRequest => key 28, max version 4, flexible v3+, group coordinator // TransactionalID is the transactional ID to use for this request. TransactionalID: string // Group is the group consumed in this transaction and to be used for diff --git a/generate/definitions/66_list_transactions b/generate/definitions/66_list_transactions index 216ac02c..bf04ac93 100644 --- a/generate/definitions/66_list_transactions +++ b/generate/definitions/66_list_transactions @@ -8,8 +8,12 @@ ListTransactionsRequest => key 66, max version 0, flexible v0+ StateFilters: [string] // The producer IDs to filter by: if empty, all transactions will be // returned; if non-empty, only transactions which match one of the filtered - // producer IDs will be returned + // producer IDs will be returned. ProducerIDFilters: [int64] + // Duration (in millis) to filter by: if < 0, all transactions will be + // returned; otherwise, only transactions running longer than this duration + // will be returned. + DurationFilterMillis: int64(-1) // v1+ // ListTransactionsResponse is a response to a ListTransactionsRequest. ListTransactionsResponse => diff --git a/generate/definitions/69_consumer_group_describe b/generate/definitions/69_consumer_group_describe new file mode 100644 index 00000000..463863dd --- /dev/null +++ b/generate/definitions/69_consumer_group_describe @@ -0,0 +1,67 @@ +// Assignment contains consumer group assignments. +Assignment => not top level, no encoding, flexible v0+ + // The topics & partitions assigned to the member. + TopicPartitions: [=>] + TopicID: uuid + Topic: string + Partitions: [int32] + +// ConsumerGroupDescribe is a part of KIP-848; this is the +// "next generation" equivalent of DescribeGroups. +ConsumerGroupDescribeRequest => key 69, max version 0, flexible v0+ + // The IDs of the groups to describe. + Groups: [string] + // Whether to include authorized operations. + IncludeAuthorizedOperations: bool + +// ConsumerGroupDescribeResponse is returned from a ConsumerGroupDescribeRequest. +ConsumerGroupDescribeResponse => + ThrottleMillis + Groups: [=>] + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + ErrorCode: int16 + // A supplementary message if this errored. + ErrorMessage: nullable-string + // The group ID. + Group: string + // The group state. + State: string + // The group epoch. + Epoch: int32 + // The assignment epoch. + AssignmentEpoch: int32 + // The selected assignor. + AssignorName: string + // Members of the group. + Members: [=>] + // The member ID. + MemberID: string + // The member instance ID, if any. + InstanceID: nullable-string + // The member rack ID, if any. + RackID: nullable-string + // The current member epoch. + MemberEpoch: int32 + // The client ID. + ClientID: string + // The client host. + ClientHost: string + // The subscribed topic names. + SubscribedTopics: [string] + // The subscribed topic regex, if any. + SubscribedTopicRegex: nullable-string + // The current assignment. + Assignment: Assignment + // The target assignment. + TargetAssignment: Assignment + // 32 bit bitfield representing authorized operations for the group. + AuthorizedOperations: int32(-2147483648) diff --git a/generate/definitions/misc b/generate/definitions/misc index 8166cee2..7c3c5cd8 100644 --- a/generate/definitions/misc +++ b/generate/definitions/misc @@ -272,7 +272,7 @@ GroupMetadataKey => not top level, with version field // // KAFKA-7862 commit 0f995ba6be, proposed in KIP-345 and included in 2.3.0 // released version 3. -GroupMetadataValue => not top level, with version field +GroupMetadataValue => not top level, with version field, flexible v4+ // Version is the version of this value. Version: int16 // ProtocolType is the type of protocol being used for the group @@ -287,7 +287,7 @@ GroupMetadataValue => not top level, with version field Leader: nullable-string // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). - CurrentStateTimestamp: int64 // v2+ + CurrentStateTimestamp: int64(-1) // v2+ // Members are the group members. Members: [=>] // MemberID is a group member. @@ -299,7 +299,7 @@ GroupMetadataValue => not top level, with version field // ClientHost is the hostname of this group member. ClientHost: string // RebalanceTimeoutMillis is the rebalance timeout of this group member. - RebalanceTimeoutMillis: int32 // v1+ + RebalanceTimeoutMillis: int32(-1) // v1+ // SessionTimeoutMillis is the session timeout of this group member. SessionTimeoutMillis: int32 // Subscription is the subscription of this group member. diff --git a/generate/gen.go b/generate/gen.go index fec90d1c..4f7cf8e3 100644 --- a/generate/gen.go +++ b/generate/gen.go @@ -625,7 +625,7 @@ func (s Struct) WriteDefault(l *LineWriter) { func (s Struct) WriteDefn(l *LineWriter) { if s.Comment != "" { - l.Write(s.Comment) + l.Write(s.Comment) //nolint:govet // ... } l.Write("type %s struct {", s.Name) if s.TopLevel { @@ -822,7 +822,7 @@ func (s Struct) WriteNewPtrFunc(l *LineWriter) { func (e Enum) WriteDefn(l *LineWriter) { if e.Comment != "" { - l.Write(e.Comment) + l.Write(e.Comment) //nolint:govet // ... l.Write("// ") } l.Write("// Possible values and their meanings:") @@ -830,7 +830,7 @@ func (e Enum) WriteDefn(l *LineWriter) { for _, v := range e.Values { l.Write("// * %d (%s)", v.Value, v.Word) if len(v.Comment) > 0 { - l.Write(v.Comment) + l.Write(v.Comment) //nolint:govet // ... } l.Write("//") } diff --git a/go.mod b/go.mod index b29fcb56..2fa3ece2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/klauspost/compress v1.17.8 github.com/pierrec/lz4/v4 v4.1.21 - github.com/twmb/franz-go/pkg/kmsg v1.8.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 golang.org/x/crypto v0.23.0 ) diff --git a/go.sum b/go.sum index 80dc0fb0..757ece4c 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= diff --git a/pkg/kadm/go.mod b/pkg/kadm/go.mod index 5039ecdc..ccce0ab6 100644 --- a/pkg/kadm/go.mod +++ b/pkg/kadm/go.mod @@ -5,12 +5,12 @@ go 1.21 toolchain go1.22.0 require ( - github.com/twmb/franz-go v1.16.1 - github.com/twmb/franz-go/pkg/kmsg v1.8.0 - golang.org/x/crypto v0.23.0 + github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 + golang.org/x/crypto v0.28.0 ) require ( - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect ) diff --git a/pkg/kadm/go.sum b/pkg/kadm/go.sum index 3a6f5aa5..b97cbf55 100644 --- a/pkg/kadm/go.sum +++ b/pkg/kadm/go.sum @@ -1,10 +1,10 @@ -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= -github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= diff --git a/pkg/kadm/kadm.go b/pkg/kadm/kadm.go index 432fc4c7..93df0868 100644 --- a/pkg/kadm/kadm.go +++ b/pkg/kadm/kadm.go @@ -191,6 +191,16 @@ type Offset struct { Metadata string // Metadata, if non-empty, is used for offset commits. } +// NewOffsetFromRecord is a helper to create an Offset for a given Record +func NewOffsetFromRecord(record *kgo.Record) Offset { + return Offset{ + Topic: record.Topic, + Partition: record.Partition, + At: record.Offset + 1, + LeaderEpoch: record.LeaderEpoch, + } +} + // Partitions wraps many partitions. type Partitions []Partition @@ -372,7 +382,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { return } r := p.Records[len(p.Records)-1] - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(r)) }) return os } @@ -383,7 +393,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { func OffsetsFromRecords(rs ...kgo.Record) Offsets { os := make(Offsets) for _, r := range rs { - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(&r)) } return os } diff --git a/pkg/kerr/kerr.go b/pkg/kerr/kerr.go index 731a23a1..1f408783 100644 --- a/pkg/kerr/kerr.go +++ b/pkg/kerr/kerr.go @@ -190,6 +190,21 @@ var ( MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."} UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."} UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"} + + // UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."} + // TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."} + // InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."} + + TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."} + + // InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."} + // ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."} + // InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."} + // FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."} + // InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."} + // DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."} + // VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."} + // InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."} ) var code2err = map[int16]error{ @@ -312,4 +327,9 @@ var code2err = map[int16]error{ 115: UnsupportedEndpointType, // "" 116: UnknownControllerID, // "" + // 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779 + // 118: TelemetryTooLarge, // "" + // 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361 + + 120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314 } diff --git a/pkg/kfake/data.go b/pkg/kfake/data.go index 9f5d46c6..566d51b6 100644 --- a/pkg/kfake/data.go +++ b/pkg/kfake/data.go @@ -92,7 +92,9 @@ func (d *data) mkt(t string, nparts int, nreplicas int, configs map[string]*stri d.id2t[id] = t d.t2id[t] = id d.treplicas[t] = nreplicas - d.tcfgs[t] = configs + if configs != nil { + d.tcfgs[t] = configs + } for i := 0; i < nparts; i++ { d.tps.mkp(t, int32(i), d.c.newPartData) } diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 20006472..8c5edefc 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1122,8 +1122,8 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt { // For Kafka-to-Kafka transactions, the transactional ID is only one half of // the equation. You must also assign a group to consume from. // -// To produce transactionally, you first BeginTransaction, then produce records -// consumed from a group, then you EndTransaction. All records produced outside +// To produce transactionally, you first [BeginTransaction], then produce records +// consumed from a group, then you [EndTransaction]. All records produced outside // of a transaction will fail immediately with an error. // // After producing a batch, you must commit what you consumed. Auto committing @@ -1470,7 +1470,7 @@ func Balancers(balancers ...GroupBalancer) GroupOpt { // in this timeout, the broker will remove the member from the group and // initiate a rebalance. // -// If you are using a GroupTransactSession for EOS, wish to lower this, and are +// If you are using a [GroupTransactSession] for EOS, wish to lower this, and are // talking to a Kafka cluster pre 2.5, consider lowering the // TransactionTimeout. If you do not, you risk a transaction finishing after a // group has rebalanced, which could lead to duplicate processing. If you are diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 01f98da4..eb15ec00 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -620,7 +620,7 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s // ResumeFetchTopics resumes fetching the input topics if they were previously // paused. Resuming topics that are not currently paused is a per-topic no-op. -// See the documentation on PauseTfetchTopics for more details. +// See the documentation on PauseFetchTopics for more details. func (cl *Client) ResumeFetchTopics(topics ...string) { defer cl.allSinksAndSources(func(sns sinkAndSource) { sns.source.maybeConsume() @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } } +// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates +// all topics received against the user provided regex. +func (c *consumer) filterMetadataAllTopics(topics []string) []string { + c.mu.Lock() + defer c.mu.Unlock() + + var rns reNews + defer rns.log(&c.cl.cfg) + + var reSeen map[string]bool + if c.d != nil { + reSeen = c.d.reSeen + } else { + reSeen = c.g.reSeen + } + + keep := topics[:0] + for _, topic := range topics { + want, seen := reSeen[topic] + if !seen { + for rawRe, re := range c.cl.cfg.topics { + if want = re.MatchString(topic); want { + rns.add(rawRe, topic) + break + } + } + if !want { + rns.skip(topic) + } + reSeen[topic] = want + } + if want { + keep = append(keep, topic) + } + } + return keep +} + func (c *consumer) doOnMetadataUpdate() { if !c.consuming() { return diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index bf42dbca..0dcbf989 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { topics := d.tps.load() - var rns reNews - if d.cfg.regex { - defer rns.log(d.cfg) - } - toUse := make(map[string]map[int32]Offset, 10) for topic, topicPartitions := range topics { var useTopic bool if d.cfg.regex { - want, seen := d.reSeen[topic] - if !seen { - for rawRe, re := range d.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - d.reSeen[topic] = want - } - useTopic = want + useTopic = d.reSeen[topic] } else { useTopic = d.m.onlyt(topic) } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index dc12083d..736730e8 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "sync/atomic" "testing" @@ -536,3 +537,71 @@ func TestIssue648(t *testing.T) { t.Errorf("did not see ErrUnknownTopicOrPartition") } } + +func TestIssue810(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + defer cleanup1() + + _, cleanup2 := tmpTopicPartitions(t, 1) + defer cleanup2() + + // Non-regex consuming: topics are available immediately. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + UnknownTopicRetries(-1), + ) + defer cl.Close() + + topics := cl.GetConsumeTopics() + exp := []string{t1} + + if !reflect.DeepEqual(topics, exp) { + t.Errorf("non-regex got %v != exp %v", topics, exp) + } + } + + // Regex consuming: topics are available only after discovery. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + ConsumeRegex(), + UnknownTopicRetries(-1), + MetadataMaxAge(time.Second), + MetadataMinAge(100*time.Millisecond), + ) + defer cl.Close() + + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fail = time.NewTimer(15 * time.Second) + failed bool + lastSaw []string + exp = []string{t1} + ) + + defer ticker.Stop() + defer fail.Stop() + + out: + for { + select { + case <-ticker.C: + lastSaw = cl.GetConsumeTopics() + if reflect.DeepEqual(lastSaw, exp) { + break out + } + cl.ForceMetadataRefresh() + case <-fail.C: + failed = true + break out + } + } + + if failed { + t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp) + } + } +} diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c1946eb4..dee54bd7 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -27,8 +27,7 @@ type groupConsumer struct { cooperative atomicBool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the - // atomic.Value in each pointer atomically. If we are consuming via - // regex, metadata grabs the lock to add new topics. + // atomic.Value in each pointer atomically. tps *topicsPartitions reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() { delta int } - var rns reNews - if g.cfg.regex { - defer rns.log(&g.cl.cfg) - } - var numNewTopics int toChange := make(map[string]change, len(topics)) for topic, topicPartitions := range topics { @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() { // support adding new regex). useTopic := true if g.cfg.regex { - want, seen := g.reSeen[topic] - if !seen { - for rawRe, re := range g.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - g.reSeen[topic] = want - } - useTopic = want + useTopic = g.reSeen[topic] } // We only track using the topic if there are partitions for diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 3ff1dbfe..9af0c81c 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -311,11 +311,11 @@ func (e *errUnknownCoordinator) Error() string { // consumer group member was kicked from the group or was never able to join // the group. type ErrGroupSession struct { - err error + Err error } func (e *ErrGroupSession) Error() string { - return fmt.Sprintf("unable to join group session: %v", e.err) + return fmt.Sprintf("unable to join group session: %v", e.Err) } -func (e *ErrGroupSession) Unwrap() error { return e.err } +func (e *ErrGroupSession) Unwrap() error { return e.Err } diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 33cac641..cbe4b5ad 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { for topic := range latest { allTopics = append(allTopics, topic) } + + // We filter out topics will not match any of our regex's. + // This ensures that the `tps` field does not contain topics + // we will never use (the client works with misc. topics in + // there, but it's better to avoid it -- and allows us to use + // `tps` in GetConsumeTopics). + allTopics = c.filterMetadataAllTopics(allTopics) + tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics) defer tpsConsumer.storeData(tpsConsumerLoad) @@ -504,6 +512,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti failing: mp.loadErr != 0, sink: mp.sns.sink, topicPartitionData: td, + lastAckedOffset: -1, } } else { p.cursor = &cursor{ diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index 1270354a..220279a7 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -195,6 +195,43 @@ func TestIssue769(t *testing.T) { } } +func TestIssue831(t *testing.T) { + t.Parallel() + + topic, cleanup := tmpTopic(t) + defer cleanup() + + cl, _ := newTestClient( + DefaultProduceTopic(topic), + UnknownTopicRetries(-1), + MaxBufferedRecords(1), + ) + defer cl.Close() + + var wg sync.WaitGroup + for i := 0; i < 500; i++ { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + wg.Add(1) + go cl.Produce(ctx, &Record{Value: []byte("foo")}, func(*Record, error) { + wg.Done() + }) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-done: + case <-time.After(15 * time.Second): + t.Fatal("still trying to produce after delay") + } +} + // This file contains golden tests against kmsg AppendTo's to ensure our custom // encoding is correct. diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index d9cca992..1ea2a29f 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -480,12 +480,24 @@ func (cl *Client) produce( }() drainBuffered := func(err error) { - p.mu.Lock() - quit = true + // The expected case here is that a context was + // canceled while we we waiting for space, so we are + // exiting and need to kill the goro above. + // + // However, it is possible that the goro above has + // already exited AND the context was canceled, and + // `select` chose the context-canceled case. + // + // So, to avoid a deadlock, we need to wakeup the + // goro above in another goroutine. + go func() { + p.mu.Lock() + quit = true + p.mu.Unlock() + p.c.Broadcast() + }() + <-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.mu.Unlock() - p.c.Broadcast() // wake the goroutine above - <-wait - p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err) } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index c82d7539..45720666 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -122,7 +122,7 @@ type Record struct { // before the record is unbuffered. ProducerEpoch int16 - // ProducerEpoch is the producer ID of this message if it was produced + // ProducerID is the producer ID of this message if it was produced // with a producer ID. An epoch and ID of 0 means it was not. // // For producing, this is left unset. This will be set by the client @@ -312,6 +312,9 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) { type FetchTopic struct { // Topic is the topic this is for. Topic string + // TopicID is the ID of the topic, if your cluster supports returning + // topic IDs in fetch responses (Kafka 3.1+). + TopicID [16]byte // Partitions contains individual partitions in the topic that were // fetched. Partitions []FetchPartition @@ -383,8 +386,8 @@ type FetchError struct { // client for. // // 3. an untyped batch parse failure; these are usually unrecoverable by -// restarts, and it may be best to just let the client continue. However, -// restarting is an option, but you may need to manually repair your +// restarts, and it may be best to just let the client continue. +// Restarting is an option, but you may need to manually repair your // partition. // // 4. an injected ErrClientClosed; this is a fatal informational error that @@ -598,6 +601,7 @@ func (fs Fetches) EachTopic(fn func(FetchTopic)) { for topic, partitions := range topics { fn(FetchTopic{ topic, + [16]byte{}, partitions, }) } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6d0f3dfe..2b96d916 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { if txnReq != nil { // txnReq can fail from: + // - TransactionAbortable // - retry failure // - auth failure // - producer id mapping / epoch errors @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool { batchesStripped, err := s.doTxnReq(req, txnReq) if err != nil { switch { + case errors.Is(err, kerr.TransactionAbortable): + // If we get TransactionAbortable, we continue into producing. + // The produce will fail with the same error, and this is the + // only way to notify the user to abort the txn. case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool { // with produce request vs. end txn (KAFKA-12671) s.cl.failProducerID(id, epoch, err) s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err) + return false } - return false } // If we stripped everything, ensure we backoff to force a @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq( continue } for _, partition := range topic.Partitions { - if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { + if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable // OperationNotAttempted is set for all partitions that are authorized // if any partition is unauthorized _or_ does not exist. We simply remove // unattempted partitions and treat them as retryable. @@ -854,6 +859,20 @@ func (s *sink) handleReqRespBatch( // handling, but KIP-360 demonstrated that resetting sequence // numbers is fundamentally unsafe, so we treat it like OOOSN. // + // KAFKA-5793 specifically mentions for OOOSN "when you get it, + // it should always mean data loss". Sometime after KIP-360, + // Kafka changed the client to remove all places + // UnknownProducerID was returned, and then started referring + // to OOOSN as retryable. KIP-890 definitively says OOOSN is + // retryable. However, the Kafka source as of 24-10-10 still + // only retries OOOSN for batches that are NOT the expected + // next batch (i.e., it's next + 1, for when there are multiple + // in flight). With KIP-890, we still just disregard whatever + // supposedly non-retryable / actually-is-retryable error is + // returned if the LogStartOffset is _after_ what we previously + // produced. Specifically, this is step (4) in in wiki link + // within KAFKA-5793. + // // InvalidMapping is similar to UnknownProducerID, but occurs // when the txnal coordinator timed out our transaction. // @@ -881,6 +900,22 @@ func (s *sink) handleReqRespBatch( // txn coordinator requests, which have PRODUCER_FENCED vs // TRANSACTION_TIMED_OUT. + if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset { + s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers", + "broker", logID(s.nodeID), + "topic", topic, + "partition", rp.Partition, + "producer_id", producerID, + "producer_epoch", producerEpoch, + "err", err, + ) + s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID) + if debug { + fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err) + } + return true, false + } + if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", "broker", logID(s.nodeID), @@ -951,6 +986,7 @@ func (s *sink) handleReqRespBatch( ) } else { batch.owner.okOnSink = true + batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records)) } s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err) didProduce = err == nil @@ -1222,6 +1258,8 @@ type recBuf struct { // to drain. inflight uint8 + lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced + topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated) // seq is used for the seq in each record batch. It is incremented when @@ -2057,7 +2095,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32 ////////////// func (*produceRequest) Key() int16 { return 0 } -func (*produceRequest) MaxVersion() int16 { return 10 } +func (*produceRequest) MaxVersion() int16 { return 11 } func (p *produceRequest) SetVersion(v int16) { p.version = v } func (p *produceRequest) GetVersion() int16 { return p.version } func (p *produceRequest) IsFlexible() bool { return p.version >= 9 } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 51f87374..74070b1c 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1115,6 +1115,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe fetchTopic := FetchTopic{ Topic: topic, + TopicID: rt.TopicID, Partitions: make([]FetchPartition, 0, len(rt.Partitions)), } diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 25cfd443..304f28b4 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -159,6 +159,18 @@ func (s *GroupTransactSession) Close() { s.cl.Close() } +// AllowRebalance is a wrapper around Client.AllowRebalance, with the exact +// same semantics. Refer to that function's documentation. +func (s *GroupTransactSession) AllowRebalance() { + s.cl.AllowRebalance() +} + +// CloseAllowingRebalance is a wrapper around Client.CloseAllowingRebalance, +// with the exact same semantics. Refer to that function's documentation. +func (s *GroupTransactSession) CloseAllowingRebalance() { + s.cl.CloseAllowingRebalance() +} + // PollFetches is a wrapper around Client.PollFetches, with the exact same // semantics. Refer to that function's documentation. // @@ -281,7 +293,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator), errors.Is(err, kerr.ConcurrentTransactions), - errors.Is(err, kerr.UnknownServerError): + errors.Is(err, kerr.UnknownServerError), + errors.Is(err, kerr.TransactionAbortable): return true } return false @@ -408,6 +421,11 @@ retry: willTryCommit = false goto retry + case errors.Is(endTxnErr, kerr.TransactionAbortable): + s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort") + willTryCommit = false + goto retry + case errors.Is(endTxnErr, kerr.UnknownServerError): s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying") after := time.NewTimer(s.cl.cfg.retryBackoff(tries)) @@ -517,7 +535,7 @@ const ( // Deprecated: Kafka 3.6 removed support for the hacky behavior that // this option was abusing. Thus, as of Kafka 3.6, this option does not // work against Kafka. This option also has never worked for Redpanda - // becuse Redpanda always strictly validated that partitions were a + // because Redpanda always strictly validated that partitions were a // part of a transaction. Later versions of Kafka and Redpanda will // remove the need for AddPartitionsToTxn at all and thus this option // ultimately will be unnecessary anyway. @@ -820,8 +838,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() { // // If the producer ID has an error and you are trying to commit, this will // return with kerr.OperationNotAttempted. If this happened, retry -// EndTransaction with TryAbort. Not other error is retryable, and you should -// not retry with TryAbort. +// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you +// can retry with TryAbort. No other error is retryable, and you should not +// retry with TryAbort. // // If records failed with UnknownProducerID and your Kafka version is at least // 2.5, then aborting here will potentially allow the client to recover for diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 75bff995..584f6f37 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -13,7 +13,31 @@ import ( // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 68 +const MaxKey = 69 + +type AssignmentTopicPartition struct { + TopicID [16]byte + + Topic string + + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AssignmentTopicPartition. +func (v *AssignmentTopicPartition) Default() { +} + +// NewAssignmentTopicPartition returns a default AssignmentTopicPartition +// This is a shortcut for creating a struct and calling Default yourself. +func NewAssignmentTopicPartition() AssignmentTopicPartition { + var v AssignmentTopicPartition + v.Default() + return v +} // MessageV0 is the message format Kafka used prior to 0.10. // @@ -898,6 +922,8 @@ type GroupMetadataValueMember struct { ClientHost string // RebalanceTimeoutMillis is the rebalance timeout of this group member. + // + // This field has a default of -1. RebalanceTimeoutMillis int32 // v1+ // SessionTimeoutMillis is the session timeout of this group member. @@ -908,11 +934,15 @@ type GroupMetadataValueMember struct { // Assignment is what the leader assigned this group member. Assignment []byte + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValueMember. func (v *GroupMetadataValueMember) Default() { + v.RebalanceTimeoutMillis = -1 } // NewGroupMetadataValueMember returns a default GroupMetadataValueMember @@ -956,22 +986,33 @@ type GroupMetadataValue struct { // CurrentStateTimestamp is the timestamp for this state of the group // (stable, etc.). + // + // This field has a default of -1. CurrentStateTimestamp int64 // v2+ // Members are the group members. Members []GroupMetadataValueMember + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v4+ } func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible { v := v.Version dst = kbin.AppendInt16(dst, v) } { v := v.ProtocolType - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.Generation @@ -979,11 +1020,19 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Protocol - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.Leader - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } if version >= 2 { v := v.CurrentStateTimestamp @@ -991,24 +1040,44 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Members - dst = kbin.AppendArrayLen(dst, len(v)) + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } for i := range v { v := &v[i] { v := v.MemberID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 3 { v := v.InstanceID - dst = kbin.AppendNullableString(dst, v) + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } } { v := v.ClientID - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } { v := v.ClientHost - dst = kbin.AppendString(dst, v) + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } } if version >= 1 { v := v.RebalanceTimeoutMillis @@ -1020,14 +1089,30 @@ func (v *GroupMetadataValue) AppendTo(dst []byte) []byte { } { v := v.Subscription - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } } { v := v.Assignment - dst = kbin.AppendBytes(dst, v) + if isFlexible { + dst = kbin.AppendCompactBytes(dst, v) + } else { + dst = kbin.AppendBytes(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) } } } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } return dst } @@ -1045,13 +1130,23 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v.Version = b.Int16() version := v.Version _ = version + isFlexible := version >= 4 + _ = isFlexible s := v { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ProtocolType = v } @@ -1061,19 +1156,35 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Protocol = v } { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.Leader = v } @@ -1085,7 +1196,11 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { v := s.Members a := v var l int32 - l = b.ArrayLen() + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } if !b.Ok() { return b.Complete() } @@ -1100,36 +1215,68 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.MemberID = v } if version >= 3 { var v *string - if unsafe { - v = b.UnsafeNullableString() + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } } else { - v = b.NullableString() + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } } s.InstanceID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientID = v } { var v string if unsafe { - v = b.UnsafeString() + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } } else { - v = b.String() + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } } s.ClientHost = v } @@ -1142,23 +1289,41 @@ func (v *GroupMetadataValue) readFrom(src []byte, unsafe bool) error { s.SessionTimeoutMillis = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Subscription = v } { - v := b.Bytes() + var v []byte + if isFlexible { + v = b.CompactBytes() + } else { + v = b.Bytes() + } s.Assignment = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } } v = a s.Members = v } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } return b.Complete() } +func (v *GroupMetadataValue) IsFlexible() bool { return v.Version >= 4 } // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to GroupMetadataValue. func (v *GroupMetadataValue) Default() { + v.CurrentStateTimestamp = -1 } // NewGroupMetadataValue returns a default GroupMetadataValue @@ -2783,7 +2948,7 @@ type ProduceRequest struct { } func (*ProduceRequest) Key() int16 { return 0 } -func (*ProduceRequest) MaxVersion() int16 { return 10 } +func (*ProduceRequest) MaxVersion() int16 { return 11 } func (v *ProduceRequest) SetVersion(version int16) { v.Version = version } func (v *ProduceRequest) GetVersion() int16 { return v.Version } func (v *ProduceRequest) IsFlexible() bool { return v.Version >= 9 } @@ -3306,7 +3471,7 @@ type ProduceResponse struct { } func (*ProduceResponse) Key() int16 { return 0 } -func (*ProduceResponse) MaxVersion() int16 { return 10 } +func (*ProduceResponse) MaxVersion() int16 { return 11 } func (v *ProduceResponse) SetVersion(version int16) { v.Version = version } func (v *ProduceResponse) GetVersion() int16 { return v.Version } func (v *ProduceResponse) IsFlexible() bool { return v.Version >= 9 } @@ -12493,7 +12658,7 @@ type FindCoordinatorRequest struct { } func (*FindCoordinatorRequest) Key() int16 { return 10 } -func (*FindCoordinatorRequest) MaxVersion() int16 { return 4 } +func (*FindCoordinatorRequest) MaxVersion() int16 { return 5 } func (v *FindCoordinatorRequest) SetVersion(version int16) { v.Version = version } func (v *FindCoordinatorRequest) GetVersion() int16 { return v.Version } func (v *FindCoordinatorRequest) IsFlexible() bool { return v.Version >= 3 } @@ -12733,7 +12898,7 @@ type FindCoordinatorResponse struct { } func (*FindCoordinatorResponse) Key() int16 { return 10 } -func (*FindCoordinatorResponse) MaxVersion() int16 { return 4 } +func (*FindCoordinatorResponse) MaxVersion() int16 { return 5 } func (v *FindCoordinatorResponse) SetVersion(version int16) { v.Version = version } func (v *FindCoordinatorResponse) GetVersion() int16 { return v.Version } func (v *FindCoordinatorResponse) IsFlexible() bool { return v.Version >= 3 } @@ -15960,12 +16125,16 @@ type ListGroupsRequest struct { // "Dead", or "Empty". If empty, all groups are returned. StatesFilter []string // v4+ + // TypesFilter, part of KIP-848, filters the types of groups we want + // to list. If empty, all groups are returned. + TypesFilter []string // v5+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*ListGroupsRequest) Key() int16 { return 16 } -func (*ListGroupsRequest) MaxVersion() int16 { return 4 } +func (*ListGroupsRequest) MaxVersion() int16 { return 5 } func (v *ListGroupsRequest) SetVersion(version int16) { v.Version = version } func (v *ListGroupsRequest) GetVersion() int16 { return v.Version } func (v *ListGroupsRequest) IsFlexible() bool { return v.Version >= 3 } @@ -16005,6 +16174,22 @@ func (v *ListGroupsRequest) AppendTo(dst []byte) []byte { } } } + if version >= 5 { + v := v.TypesFilter + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -16064,6 +16249,42 @@ func (v *ListGroupsRequest) readFrom(src []byte, unsafe bool) error { v = a s.StatesFilter = v } + if version >= 5 { + v := s.TypesFilter + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.TypesFilter = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -16101,6 +16322,9 @@ type ListGroupsResponseGroup struct { // The group state. GroupState string // v4+ + // The group type. + GroupType string // v5+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } @@ -16146,7 +16370,7 @@ type ListGroupsResponse struct { } func (*ListGroupsResponse) Key() int16 { return 16 } -func (*ListGroupsResponse) MaxVersion() int16 { return 4 } +func (*ListGroupsResponse) MaxVersion() int16 { return 5 } func (v *ListGroupsResponse) SetVersion(version int16) { v.Version = version } func (v *ListGroupsResponse) GetVersion() int16 { return v.Version } func (v *ListGroupsResponse) IsFlexible() bool { return v.Version >= 3 } @@ -16200,6 +16424,14 @@ func (v *ListGroupsResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } + if version >= 5 { + v := v.GroupType + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -16308,6 +16540,23 @@ func (v *ListGroupsResponse) readFrom(src []byte, unsafe bool) error { } s.GroupState = v } + if version >= 5 { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.GroupType = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -19348,7 +19597,7 @@ type InitProducerIDRequest struct { } func (*InitProducerIDRequest) Key() int16 { return 22 } -func (*InitProducerIDRequest) MaxVersion() int16 { return 4 } +func (*InitProducerIDRequest) MaxVersion() int16 { return 5 } func (v *InitProducerIDRequest) SetVersion(version int16) { v.Version = version } func (v *InitProducerIDRequest) GetVersion() int16 { return v.Version } func (v *InitProducerIDRequest) IsFlexible() bool { return v.Version >= 2 } @@ -19523,7 +19772,7 @@ type InitProducerIDResponse struct { } func (*InitProducerIDResponse) Key() int16 { return 22 } -func (*InitProducerIDResponse) MaxVersion() int16 { return 4 } +func (*InitProducerIDResponse) MaxVersion() int16 { return 5 } func (v *InitProducerIDResponse) SetVersion(version int16) { v.Version = version } func (v *InitProducerIDResponse) GetVersion() int16 { return v.Version } func (v *InitProducerIDResponse) IsFlexible() bool { return v.Version >= 2 } @@ -20367,7 +20616,7 @@ type AddPartitionsToTxnRequest struct { } func (*AddPartitionsToTxnRequest) Key() int16 { return 24 } -func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 4 } +func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 5 } func (v *AddPartitionsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -20953,7 +21202,7 @@ type AddPartitionsToTxnResponse struct { } func (*AddPartitionsToTxnResponse) Key() int16 { return 24 } -func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 4 } +func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 5 } func (v *AddPartitionsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -21388,7 +21637,7 @@ type AddOffsetsToTxnRequest struct { } func (*AddOffsetsToTxnRequest) Key() int16 { return 25 } -func (*AddOffsetsToTxnRequest) MaxVersion() int16 { return 3 } +func (*AddOffsetsToTxnRequest) MaxVersion() int16 { return 4 } func (v *AddOffsetsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddOffsetsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddOffsetsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -21559,7 +21808,7 @@ type AddOffsetsToTxnResponse struct { } func (*AddOffsetsToTxnResponse) Key() int16 { return 25 } -func (*AddOffsetsToTxnResponse) MaxVersion() int16 { return 3 } +func (*AddOffsetsToTxnResponse) MaxVersion() int16 { return 4 } func (v *AddOffsetsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddOffsetsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddOffsetsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -21668,7 +21917,7 @@ type EndTxnRequest struct { } func (*EndTxnRequest) Key() int16 { return 26 } -func (*EndTxnRequest) MaxVersion() int16 { return 3 } +func (*EndTxnRequest) MaxVersion() int16 { return 4 } func (v *EndTxnRequest) SetVersion(version int16) { v.Version = version } func (v *EndTxnRequest) GetVersion() int16 { return v.Version } func (v *EndTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -21832,7 +22081,7 @@ type EndTxnResponse struct { } func (*EndTxnResponse) Key() int16 { return 26 } -func (*EndTxnResponse) MaxVersion() int16 { return 3 } +func (*EndTxnResponse) MaxVersion() int16 { return 4 } func (v *EndTxnResponse) SetVersion(version int16) { v.Version = version } func (v *EndTxnResponse) GetVersion() int16 { return v.Version } func (v *EndTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -22678,7 +22927,7 @@ type TxnOffsetCommitRequest struct { } func (*TxnOffsetCommitRequest) Key() int16 { return 28 } -func (*TxnOffsetCommitRequest) MaxVersion() int16 { return 3 } +func (*TxnOffsetCommitRequest) MaxVersion() int16 { return 4 } func (v *TxnOffsetCommitRequest) SetVersion(version int16) { v.Version = version } func (v *TxnOffsetCommitRequest) GetVersion() int16 { return v.Version } func (v *TxnOffsetCommitRequest) IsFlexible() bool { return v.Version >= 3 } @@ -23143,7 +23392,7 @@ type TxnOffsetCommitResponse struct { } func (*TxnOffsetCommitResponse) Key() int16 { return 28 } -func (*TxnOffsetCommitResponse) MaxVersion() int16 { return 3 } +func (*TxnOffsetCommitResponse) MaxVersion() int16 { return 4 } func (v *TxnOffsetCommitResponse) SetVersion(version int16) { v.Version = version } func (v *TxnOffsetCommitResponse) GetVersion() int16 { return v.Version } func (v *TxnOffsetCommitResponse) IsFlexible() bool { return v.Version >= 3 } @@ -43847,9 +44096,16 @@ type ListTransactionsRequest struct { // The producer IDs to filter by: if empty, all transactions will be // returned; if non-empty, only transactions which match one of the filtered - // producer IDs will be returned + // producer IDs will be returned. ProducerIDFilters []int64 + // Duration (in millis) to filter by: if < 0, all transactions will be + // returned; otherwise, only transactions running longer than this duration + // will be returned. + // + // This field has a default of -1. + DurationFilterMillis int64 // v1+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags } @@ -43907,6 +44163,10 @@ func (v *ListTransactionsRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendInt64(dst, v) } } + if version >= 1 { + v := v.DurationFilterMillis + dst = kbin.AppendInt64(dst, v) + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -43989,6 +44249,10 @@ func (v *ListTransactionsRequest) readFrom(src []byte, unsafe bool) error { v = a s.ProducerIDFilters = v } + if version >= 1 { + v := b.Int64() + s.DurationFilterMillis = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -44006,6 +44270,7 @@ func NewPtrListTransactionsRequest() *ListTransactionsRequest { // Default sets any default fields. Calling this allows for future compatibility // if new fields are added to ListTransactionsRequest. func (v *ListTransactionsRequest) Default() { + v.DurationFilterMillis = -1 } // NewListTransactionsRequest returns a default ListTransactionsRequest @@ -45327,6 +45592,1063 @@ func NewConsumerGroupHeartbeatResponse() ConsumerGroupHeartbeatResponse { return v } +// Assignment contains consumer group assignments. +type Assignment struct { + // The topics & partitions assigned to the member. + TopicPartitions []AssignmentTopicPartition + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to Assignment. +func (v *Assignment) Default() { +} + +// NewAssignment returns a default Assignment +// This is a shortcut for creating a struct and calling Default yourself. +func NewAssignment() Assignment { + var v Assignment + v.Default() + return v +} + +// ConsumerGroupDescribe is a part of KIP-848; this is the +// "next generation" equivalent of DescribeGroups. +type ConsumerGroupDescribeRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The IDs of the groups to describe. + Groups []string + + // Whether to include authorized operations. + IncludeAuthorizedOperations bool + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupDescribeRequest) Key() int16 { return 69 } +func (*ConsumerGroupDescribeRequest) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupDescribeRequest) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupDescribeRequest) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupDescribeRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupDescribeRequest) ResponseKind() Response { + r := &ConsumerGroupDescribeResponse{Version: v.Version} + r.Default() + return r +} + +// RequestWith is requests v on r and returns the response or an error. +// For sharded requests, the response may be merged and still return an error. +// It is better to rely on client.RequestSharded than to rely on proper merging behavior. +func (v *ConsumerGroupDescribeRequest) RequestWith(ctx context.Context, r Requestor) (*ConsumerGroupDescribeResponse, error) { + kresp, err := r.Request(ctx, v) + resp, _ := kresp.(*ConsumerGroupDescribeResponse) + return resp, err +} + +func (v *ConsumerGroupDescribeRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.Groups + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } + { + v := v.IncludeAuthorizedOperations + dst = kbin.AppendBool(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupDescribeRequest) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupDescribeRequest) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupDescribeRequest) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := s.Groups + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.Groups = v + } + { + v := b.Bool() + s.IncludeAuthorizedOperations = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupDescribeRequest returns a pointer to a default ConsumerGroupDescribeRequest +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupDescribeRequest() *ConsumerGroupDescribeRequest { + var v ConsumerGroupDescribeRequest + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeRequest. +func (v *ConsumerGroupDescribeRequest) Default() { +} + +// NewConsumerGroupDescribeRequest returns a default ConsumerGroupDescribeRequest +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeRequest() ConsumerGroupDescribeRequest { + var v ConsumerGroupDescribeRequest + v.Default() + return v +} + +type ConsumerGroupDescribeResponseGroupMember struct { + // The member ID. + MemberID string + + // The member instance ID, if any. + InstanceID *string + + // The member rack ID, if any. + RackID *string + + // The current member epoch. + MemberEpoch int32 + + // The client ID. + ClientID string + + // The client host. + ClientHost string + + // The subscribed topic names. + SubscribedTopics []string + + // The subscribed topic regex, if any. + SubscribedTopicRegex *string + + // The current assignment. + Assignment Assignment + + // The target assignment. + TargetAssignment Assignment + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponseGroupMember. +func (v *ConsumerGroupDescribeResponseGroupMember) Default() { + { + v := &v.Assignment + _ = v + } + { + v := &v.TargetAssignment + _ = v + } +} + +// NewConsumerGroupDescribeResponseGroupMember returns a default ConsumerGroupDescribeResponseGroupMember +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponseGroupMember() ConsumerGroupDescribeResponseGroupMember { + var v ConsumerGroupDescribeResponseGroupMember + v.Default() + return v +} + +type ConsumerGroupDescribeResponseGroup struct { + // ErrorCode is the error for this response. + // + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + ErrorCode int16 + + // A supplementary message if this errored. + ErrorMessage *string + + // The group ID. + Group string + + // The group state. + State string + + // The group epoch. + Epoch int32 + + // The assignment epoch. + AssignmentEpoch int32 + + // The selected assignor. + AssignorName string + + // Members of the group. + Members []ConsumerGroupDescribeResponseGroupMember + + // 32 bit bitfield representing authorized operations for the group. + // + // This field has a default of -2147483648. + AuthorizedOperations int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponseGroup. +func (v *ConsumerGroupDescribeResponseGroup) Default() { + v.AuthorizedOperations = -2147483648 +} + +// NewConsumerGroupDescribeResponseGroup returns a default ConsumerGroupDescribeResponseGroup +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponseGroup() ConsumerGroupDescribeResponseGroup { + var v ConsumerGroupDescribeResponseGroup + v.Default() + return v +} + +// ConsumerGroupDescribeResponse is returned from a ConsumerGroupDescribeRequest. +type ConsumerGroupDescribeResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + Groups []ConsumerGroupDescribeResponseGroup + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags +} + +func (*ConsumerGroupDescribeResponse) Key() int16 { return 69 } +func (*ConsumerGroupDescribeResponse) MaxVersion() int16 { return 0 } +func (v *ConsumerGroupDescribeResponse) SetVersion(version int16) { v.Version = version } +func (v *ConsumerGroupDescribeResponse) GetVersion() int16 { return v.Version } +func (v *ConsumerGroupDescribeResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *ConsumerGroupDescribeResponse) Throttle() (int32, bool) { + return v.ThrottleMillis, v.Version >= 0 +} + +func (v *ConsumerGroupDescribeResponse) SetThrottle(throttleMillis int32) { + v.ThrottleMillis = throttleMillis +} + +func (v *ConsumerGroupDescribeResponse) RequestKind() Request { + return &ConsumerGroupDescribeRequest{Version: v.Version} +} + +func (v *ConsumerGroupDescribeResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.Groups + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.ErrorMessage + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.Group + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.State + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Epoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.AssignmentEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.AssignorName + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Members + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.MemberID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.InstanceID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.RackID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.MemberEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ClientID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.ClientHost + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.SubscribedTopics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + } + { + v := v.SubscribedTopicRegex + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := &v.Assignment + { + v := v.TopicPartitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + { + v := &v.TargetAssignment + { + v := v.TopicPartitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TopicID + dst = kbin.AppendUuid(dst, v) + } + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + { + v := v.AuthorizedOperations + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + return dst +} + +func (v *ConsumerGroupDescribeResponse) ReadFrom(src []byte) error { + return v.readFrom(src, false) +} + +func (v *ConsumerGroupDescribeResponse) UnsafeReadFrom(src []byte) error { + return v.readFrom(src, true) +} + +func (v *ConsumerGroupDescribeResponse) readFrom(src []byte, unsafe bool) error { + v.Default() + b := kbin.Reader{Src: src} + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := s.Groups + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupDescribeResponseGroup, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.ErrorMessage = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Group = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.State = v + } + { + v := b.Int32() + s.Epoch = v + } + { + v := b.Int32() + s.AssignmentEpoch = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.AssignorName = v + } + { + v := s.Members + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]ConsumerGroupDescribeResponseGroupMember, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.MemberID = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.InstanceID = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.RackID = v + } + { + v := b.Int32() + s.MemberEpoch = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.ClientID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.ClientHost = v + } + { + v := s.SubscribedTopics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]string, l)...) + } + for i := int32(0); i < l; i++ { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + a[i] = v + } + v = a + s.SubscribedTopics = v + } + { + var v *string + if isFlexible { + if unsafe { + v = b.UnsafeCompactNullableString() + } else { + v = b.CompactNullableString() + } + } else { + if unsafe { + v = b.UnsafeNullableString() + } else { + v = b.NullableString() + } + } + s.SubscribedTopicRegex = v + } + { + v := &s.Assignment + v.Default() + s := v + { + v := s.TopicPartitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AssignmentTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.TopicPartitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + { + v := &s.TargetAssignment + v.Default() + s := v + { + v := s.TopicPartitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AssignmentTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Uuid() + s.TopicID = v + } + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.TopicPartitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Members = v + } + { + v := b.Int32() + s.AuthorizedOperations = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Groups = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + return b.Complete() +} + +// NewPtrConsumerGroupDescribeResponse returns a pointer to a default ConsumerGroupDescribeResponse +// This is a shortcut for creating a new(struct) and calling Default yourself. +func NewPtrConsumerGroupDescribeResponse() *ConsumerGroupDescribeResponse { + var v ConsumerGroupDescribeResponse + v.Default() + return &v +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to ConsumerGroupDescribeResponse. +func (v *ConsumerGroupDescribeResponse) Default() { +} + +// NewConsumerGroupDescribeResponse returns a default ConsumerGroupDescribeResponse +// This is a shortcut for creating a struct and calling Default yourself. +func NewConsumerGroupDescribeResponse() ConsumerGroupDescribeResponse { + var v ConsumerGroupDescribeResponse + v.Default() + return v +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -45471,6 +46793,8 @@ func RequestForKey(key int16) Request { return NewPtrAllocateProducerIDsRequest() case 68: return NewPtrConsumerGroupHeartbeatRequest() + case 69: + return NewPtrConsumerGroupDescribeRequest() } } @@ -45618,6 +46942,8 @@ func ResponseForKey(key int16) Response { return NewPtrAllocateProducerIDsResponse() case 68: return NewPtrConsumerGroupHeartbeatResponse() + case 69: + return NewPtrConsumerGroupDescribeResponse() } } @@ -45765,6 +47091,8 @@ func NameForKey(key int16) string { return "AllocateProducerIDs" case 68: return "ConsumerGroupHeartbeat" + case 69: + return "ConsumerGroupDescribe" } } @@ -45841,6 +47169,7 @@ const ( ListTransactions Key = 66 AllocateProducerIDs Key = 67 ConsumerGroupHeartbeat Key = 68 + ConsumerGroupDescribe Key = 69 ) // Name returns the name for this key. diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 3081c346..2267b919 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -67,6 +67,7 @@ var versions = []struct { {"v3.5", V3_5_0()}, {"v3.6", V3_6_0()}, {"v3.7", V3_7_0()}, + {"v3.8", V3_8_0()}, } // VersionStrings returns all recognized versions, minus any patch, that can be @@ -333,7 +334,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { // // TODO: add introduced-version to differentiate some specific // keys. - skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67}, + skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67, 74, 75}, } for _, opt := range opts { opt.apply(&cfg) @@ -378,6 +379,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { {max350, "v3.5"}, {max360, "v3.6"}, {max370, "v3.7"}, + {max380, "v3.8"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { if v == -1 { @@ -520,6 +522,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) } func V3_5_0() *Versions { return zkBrokerOf(max350) } func V3_6_0() *Versions { return zkBrokerOf(max360) } func V3_7_0() *Versions { return zkBrokerOf(max370) } +func V3_8_0() *Versions { return zkBrokerOf(max380) } func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} @@ -1158,8 +1161,37 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys { return v }) +var max380 = nextMax(max370, func(v listenerKeys) listenerKeys { + // KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890 + // Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327 + v[0].inc() // 11 produce + v[10].inc() // 5 find coordinator + v[22].inc() // 5 init producer id + v[24].inc() // 5 add partitions to txn + v[25].inc() // 4 add offsets to txn + v[26].inc() // 4 end txn + v[28].inc() // 4 txn offset commit + + // KAFKA-15460 68745ef21a9d8fe0f37a8c5fbc7761a598718d46 KIP-848 + v[16].inc() // 5 list groups + + // KAFKA-14509 90e646052a17e3f6ec1a013d76c1e6af2fbb756e KIP-848 added + // 7b0352f1bd9b923b79e60b18b40f570d4bfafcc0 + // b7c99e22a77392d6053fe231209e1de32b50a98b + // 68389c244e720566aaa8443cd3fc0b9d2ec4bb7a + // 5f410ceb04878ca44d2d007655155b5303a47907 stabilized + v = append(v, + k(zkBroker, rBroker), // 69 consumer group describe + ) + + // KAFKA-16265 b4e96913cc6c827968e47a31261e0bd8fdf677b5 KIP-994 (part 1) + v[66].inc() + + return v +}) + var ( - maxStable = max370 + maxStable = max380 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { return v }) diff --git a/pkg/sr/client.go b/pkg/sr/client.go index c2e9b2ce..1bdc0e21 100644 --- a/pkg/sr/client.go +++ b/pkg/sr/client.go @@ -34,6 +34,8 @@ type ResponseError struct { Method string `json:"-"` // URL is the full path that was requested that resulted in this error. URL string `json:"-"` + // StatusCode is the status code that was returned for this error. + StatusCode int `json:"-"` // Raw contains the raw response body. Raw []byte `json:"-"` @@ -55,6 +57,7 @@ type Client struct { httpcl *http.Client ua string defParams Param + opts []ClientOpt basicAuth *struct { user string @@ -69,6 +72,7 @@ func NewClient(opts ...ClientOpt) (*Client, error) { urls: []string{"http://localhost:8081"}, httpcl: &http.Client{Timeout: 5 * time.Second}, ua: "franz-go", + opts: opts, } for _, opt := range opts { @@ -82,6 +86,13 @@ func NewClient(opts ...ClientOpt) (*Client, error) { return cl, nil } +// Opts returns the options that were used to create this client. This can be +// as a base to generate a new client, where you can add override options to +// the end of the original input list. +func (cl *Client) Opts() []ClientOpt { + return cl.opts +} + func (cl *Client) get(ctx context.Context, path string, into any) error { return cl.do(ctx, http.MethodGet, path, nil, into) } @@ -149,9 +160,13 @@ start: if resp.StatusCode >= 300 { e := &ResponseError{ - Method: method, - URL: reqURL, - Raw: body, + Method: method, + URL: reqURL, + StatusCode: resp.StatusCode, + Raw: bytes.TrimSpace(body), + } + if len(e.Raw) == 0 { + e.Message = "no response" } _ = json.Unmarshal(body, e) // best effort return e diff --git a/pkg/sr/go.mod b/pkg/sr/go.mod index 7bfe2563..96924dab 100644 --- a/pkg/sr/go.mod +++ b/pkg/sr/go.mod @@ -1,5 +1,5 @@ module github.com/twmb/franz-go/pkg/sr -go 1.21 +go 1.22 toolchain go1.22.0