From 79e497bfb263fcd87a7bf1fb01385e1ba156690e Mon Sep 17 00:00:00 2001 From: Noam Cohen Date: Wed, 28 Aug 2024 13:41:25 +0300 Subject: [PATCH] add `NewOffsetFromRecord` helper function --- pkg/kadm/kadm.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/kadm/kadm.go b/pkg/kadm/kadm.go index 432fc4c7..93df0868 100644 --- a/pkg/kadm/kadm.go +++ b/pkg/kadm/kadm.go @@ -191,6 +191,16 @@ type Offset struct { Metadata string // Metadata, if non-empty, is used for offset commits. } +// NewOffsetFromRecord is a helper to create an Offset for a given Record +func NewOffsetFromRecord(record *kgo.Record) Offset { + return Offset{ + Topic: record.Topic, + Partition: record.Partition, + At: record.Offset + 1, + LeaderEpoch: record.LeaderEpoch, + } +} + // Partitions wraps many partitions. type Partitions []Partition @@ -372,7 +382,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { return } r := p.Records[len(p.Records)-1] - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(r)) }) return os } @@ -383,7 +393,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { func OffsetsFromRecords(rs ...kgo.Record) Offsets { os := make(Offsets) for _, r := range rs { - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(&r)) } return os }