Skip to content

Commit

Permalink
Merge pull request #721 from twmb/kafka-3.7
Browse files Browse the repository at this point in the history
Kafka 3.7 proto
  • Loading branch information
twmb authored May 9, 2024
2 parents 8b53958 + f5106ae commit c77d58e
Show file tree
Hide file tree
Showing 7 changed files with 1,409 additions and 22 deletions.
18 changes: 17 additions & 1 deletion generate/definitions/00_produce
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// Note that the special client ID "__admin_client" will allow you to produce
// records to internal topics. This is generally recommended if you want to
// break your Kafka cluster.
ProduceRequest => key 0, max version 9, flexible v9+
ProduceRequest => key 0, max version 10, flexible v9+
// TransactionID is the transaction ID to use for this request, allowing for
// exactly once semantics.
TransactionID: nullable-string // v3+
Expand Down Expand Up @@ -145,4 +145,20 @@ ProduceResponse =>
// ErrorMessage is the global error message of of what caused this batch
// to error.
ErrorMessage: nullable-string // v8+
CurrentLeader: => // tag 0
// The ID of the current leader, or -1 if unknown.
LeaderID: int32(-1)
// The latest known leader epoch.
LeaderEpoch: int32(-1)
ThrottleMillis(6) // v1+
// Brokers is present if any partition responses contain the error
// NOT_LEADER_OR_FOLLOWER.
Brokers: [=>] // tag 0
// NodeID is the node ID of a Kafka broker.
NodeID: int32
// Host is the hostname of a Kafka broker.
Host: string
// Port is the port of a Kafka broker.
Port: int32
// Rack is the rack this Kafka broker is in.
Rack: nullable-string
13 changes: 12 additions & 1 deletion generate/definitions/01_fetch
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and
// the ReplicaID, and deprecates the old ReplicaID (KIP-903).
FetchRequest => key 1, max version 15, flexible v12+
FetchRequest => key 1, max version 16, flexible v12+
// The cluster ID, if known. This is used to validate metadata fetches
// prior to broker registration.
ClusterID: nullable-string(null) // tag 0
Expand Down Expand Up @@ -229,3 +229,14 @@ FetchResponse =>
// Starting v4, this transitioned to the RecordBatch format (thus this
// contains many RecordBatch structs).
RecordBatches: nullable-bytes
// Brokers is present if any partition responses contain the error
// NOT_LEADER_OR_FOLLOWER.
Brokers: [=>] // tag 0
// NodeID is the node ID of a Kafka broker.
NodeID: int32
// Host is the hostname of a Kafka broker.
Host: string
// Port is the port of a Kafka broker.
Port: int32
// Rack is the rack this Kafka broker is in.
Rack: nullable-string
2 changes: 1 addition & 1 deletion generate/definitions/08_offset_commit
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// OffsetCommitRequest commits offsets for consumed topics / partitions in
// a group.
OffsetCommitRequest => key 8, max version 8, flexible v8+, group coordinator
OffsetCommitRequest => key 8, max version 9, flexible v8+, group coordinator
// Group is the group this request is committing offsets to.
Group: string
// Generation being -1 and group being empty means the group is being used
Expand Down
6 changes: 5 additions & 1 deletion generate/definitions/09_offset_fetch
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// OffsetFetchRequest requests the most recent committed offsets for topic
// partitions in a group.
OffsetFetchRequest => key 9, max version 8, flexible v6+, group coordinator
OffsetFetchRequest => key 9, max version 9, flexible v6+, group coordinator
// Group is the group to fetch offsets for.
Group: string // v0-v7
// Topics contains topics to fetch offets for. Version 2+ allows this to be
Expand All @@ -17,6 +17,10 @@ OffsetFetchRequest => key 9, max version 8, flexible v6+, group coordinator
// are left undocumented. Refer to the top level documentation if necessary.
Groups: [=>] // v8+
Group: string
// The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848).
MemberID: nullable-string // v9+
// The member epoch if using the new consumer protocol (KIP-848).
MemberEpoch: int32(-1) // v9+
Topics: nullable[=>]
Topic: string
Partitions: [int32]
Expand Down
6 changes: 5 additions & 1 deletion generate/definitions/60_describe_cluster
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Introduced for KIP-700, DescribeClusterRequest is effectively an "admin"
// type metadata request for information that producers or consumers do not
// need to care about.
DescribeClusterRequest => key 60, max version 0, flexible v0+
DescribeClusterRequest => key 60, max version 1, flexible v0+
// Whether to include cluster authorized operations. This requires DESCRIBE
// on CLUSTER.
IncludeClusterAuthorizedOperations: bool
// The endpoint type to describe. 1=brokers, 2=controllers.
EndpointType: int8(1) // v1+

// DescribeClusterResponse is a response to a DescribeClusterRequest.
DescribeClusterResponse =>
Expand All @@ -13,6 +15,8 @@ DescribeClusterResponse =>
ErrorCode: int16
// The top level error message, if any.
ErrorMessage: nullable-string
// The endpoint type that was described. 1=brokers, 2=controllers.
EndpointType: int8(1) // v1+
// The cluster ID that responding broker belongs to.
ClusterID: string
// The ID of the controller broker.
Expand Down
61 changes: 61 additions & 0 deletions generate/definitions/68_consumer_group_heartbeat
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// ConsumerGroupHeartbeat is a part of KIP-848; there are a lot of details
// to this request so documentation is left to the KIP itself.
ConsumerGroupHeartbeatRequest => key 68, max version 0, flexible v0+
// The group ID.
Group: string
// The member ID generated by the coordinator. This must be kept during
// the entire lifetime of the member.
MemberID: string
// The current member epoch; 0 to join the group, -1 to leave, -2 to
// indicate that the static member will rejoin.
MemberEpoch: int32
// Instance ID of the member; null if not provided or if unchanging.
InstanceID: nullable-string
// The rack ID of the member; null if not provided or if unchanging.
RackID: nullable-string
// RebalanceTimeoutMillis is how long the coordinator will wait on a member
// to revoke its partitions. -1 if unchanging.
RebalanceTimeoutMillis: int32(-1)
// Subscribed topics; null if unchanging.
SubscribedTopicNames: nullable[string]
// The server side assignor to use; null if unchanging.
ServerAssignor: nullable-string
// Topic partitions owned by the member; null if unchanging.
Topics: nullable[=>]
// The topic ID.
TopicID: uuid
// The partitions.
Partitions: [int32]

// ConsumerGroupHeartbeatResponse is returned from a ConsumerGroupHeartbeatRequest.
ConsumerGroupHeartbeatResponse =>
ThrottleMillis
// ErrorCode is the error for this response.
//
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_MEMBER_ID (version 0+)
// - FENCED_MEMBER_EPOCH (version 0+)
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
ErrorCode: int16
// A supplementary message if this errored.
ErrorMessage: nullable-string
// The member ID generated by the coordinator; provided when joining
// with MemberEpoch=0.
MemberID: nullable-string
// The member epoch.
MemberEpoch: int32
// The heartbeat interval, in milliseconds.
HeartbeatIntervalMillis: int32
// The assignment; null if not provided.
Assignment: nullable=>
// The topics partitions that can be used immediately.
Topics: [=>]
TopicID: uuid
Partitions: [int32]
Loading

0 comments on commit c77d58e

Please sign in to comment.