From 2eed36e1587addedb858dd7b56dc5aa8f0939bde Mon Sep 17 00:00:00 2001 From: Chris Roche Date: Tue, 22 Oct 2024 16:01:18 -0700 Subject: [PATCH] Fix handling of invalid base offsets --- pkg/kgo/producer.go | 8 +++++++- pkg/kgo/source.go | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 1ea2a29f..dffb25aa 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -553,7 +553,13 @@ start: p.promisesMu.Lock() for i, pr := range b.recs { pr.LeaderEpoch = 0 - pr.Offset = b.baseOffset + int64(i) + if b.baseOffset == -1 { + // if the base offset is invalid/unknown (-1), all record offsets should + // be treated as unknown + pr.Offset = -1 + } else { + pr.Offset = b.baseOffset + int64(i) + } pr.Partition = b.partition pr.ProducerID = b.pid pr.ProducerEpoch = b.epoch diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 12732e90..131d6768 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1773,7 +1773,11 @@ func recordToRecord( ProducerID: batch.ProducerID, ProducerEpoch: batch.ProducerEpoch, LeaderEpoch: batch.PartitionLeaderEpoch, - Offset: batch.FirstOffset + int64(record.OffsetDelta), + } + if batch.FirstOffset == -1 { + r.Offset = -1 + } else { + r.Offset = batch.FirstOffset + int64(record.OffsetDelta) } if r.Attrs.TimestampType() == 0 { r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)