From f372bc3a0e9a0e35a4a5dba3d423a12984c4169b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Oct 2024 18:36:28 -0600 Subject: [PATCH] Merge pull request #794 from twmb/790 kgo: add TopicID to the FetchTopic type --- pkg/kgo/record_and_fetch.go | 4 ++++ pkg/kgo/source.go | 1 + 2 files changed, 5 insertions(+) diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 538d9a9a..45720666 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -312,6 +312,9 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) { type FetchTopic struct { // Topic is the topic this is for. Topic string + // TopicID is the ID of the topic, if your cluster supports returning + // topic IDs in fetch responses (Kafka 3.1+). + TopicID [16]byte // Partitions contains individual partitions in the topic that were // fetched. Partitions []FetchPartition @@ -598,6 +601,7 @@ func (fs Fetches) EachTopic(fn func(FetchTopic)) { for topic, partitions := range topics { fn(FetchTopic{ topic, + [16]byte{}, partitions, }) } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 51f87374..74070b1c 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1115,6 +1115,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe fetchTopic := FetchTopic{ Topic: topic, + TopicID: rt.TopicID, Partitions: make([]FetchPartition, 0, len(rt.Partitions)), }