Skip to content

Commit

Permalink
add NewOffsetFromRecord helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
noamcohen97 committed Aug 28, 2024
1 parent b77dd13 commit 79e497b
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/kadm/kadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ type Offset struct {
Metadata string // Metadata, if non-empty, is used for offset commits.
}

// NewOffsetFromRecord is a helper to create an Offset for a given Record
func NewOffsetFromRecord(record *kgo.Record) Offset {
return Offset{
Topic: record.Topic,
Partition: record.Partition,
At: record.Offset + 1,
LeaderEpoch: record.LeaderEpoch,
}
}

// Partitions wraps many partitions.
type Partitions []Partition

Expand Down Expand Up @@ -372,7 +382,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets {
return
}
r := p.Records[len(p.Records)-1]
os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
os.Add(NewOffsetFromRecord(r))
})
return os
}
Expand All @@ -383,7 +393,7 @@ func OffsetsFromFetches(fs kgo.Fetches) Offsets {
func OffsetsFromRecords(rs ...kgo.Record) Offsets {
os := make(Offsets)
for _, r := range rs {
os.AddOffset(r.Topic, r.Partition, r.Offset+1, r.LeaderEpoch)
os.Add(NewOffsetFromRecord(&r))
}
return os
}
Expand Down

0 comments on commit 79e497b

Please sign in to comment.