diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 4f1ebe6f..1460af43 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -274,6 +274,9 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) { type FetchTopic struct { // Topic is the topic this is for. Topic string + // TopicID is the ID of the topic, if your cluster supports returning + // topic IDs in fetch responses (Kafka 3.1+). + TopicID [16]byte // Partitions contains individual partitions in the topic that were // fetched. Partitions []FetchPartition @@ -560,6 +563,7 @@ func (fs Fetches) EachTopic(fn func(FetchTopic)) { for topic, partitions := range topics { fn(FetchTopic{ topic, + [16]byte{}, partitions, }) } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..12732e90 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1041,6 +1041,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe fetchTopic := FetchTopic{ Topic: topic, + TopicID: rt.TopicID, Partitions: make([]FetchPartition, 0, len(rt.Partitions)), }