diff --git a/pkg/kadm/kadm.go b/pkg/kadm/kadm.go index 432fc4c7..93df0868 100644 --- a/pkg/kadm/kadm.go +++ b/pkg/kadm/kadm.go @@ -191,6 +191,16 @@ type Offset struct { Metadata string // Metadata, if non-empty, is used for offset commits. } +// NewOffsetFromRecord is a helper to create an Offset for a given Record +func NewOffsetFromRecord(record *kgo.Record) Offset { + return Offset{ + Topic: record.Topic, + Partition: record.Partition, + At: record.Offset + 1, + LeaderEpoch: record.LeaderEpoch, + } +} + // Partitions wraps many partitions. type Partitions []Partition @@ -372,7 +382,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { return } r := p.Records[len(p.Records)-1] - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(r)) }) return os } @@ -383,7 +393,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets { func OffsetsFromRecords(rs ...kgo.Record) Offsets { os := make(Offsets) for _, r := range rs { - os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch) + os.Add(NewOffsetFromRecord(&r)) } return os }