Skip to content

Commit

Permalink
Merge pull request #628 from twmb/621
Browse files Browse the repository at this point in the history
consuming: reset to nearest if we receive OOOR while fetching
  • Loading branch information
twmb authored Dec 6, 2023
2 parents 5076659 + e6ed69f commit 3134cb2
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type cursorOffset struct {
// details.
lastConsumedEpoch int32

// If we receive OFFSET_OUT_OF_RANGE, and we previously *know* we
// consumed an offset, we reset to the nearest offset after our prior
// known valid consumed offset.
lastConsumedTime time.Time

// The current high watermark of the partition. Uninitialized (0) means
// we do not know the HWM, or there is no lag.
hwm int64
Expand Down Expand Up @@ -506,6 +511,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
pCursor.from.setOffset(cursorOffset{
offset: lastReturnedRecord.Offset + 1,
lastConsumedEpoch: lastReturnedRecord.LeaderEpoch,
lastConsumedTime: lastReturnedRecord.Timestamp,
hwm: p.HighWatermark,
})
}
Expand Down Expand Up @@ -1067,23 +1073,44 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// In all cases except case 4, we also have to check if
// no reset offset was configured. If so, we ignore
// trying to reset and instead keep our failed partition.
addList := func(replica int32) {
addList := func(replica int32, log bool) {
if s.cl.cfg.resetOffset.noReset {
keep = true
} else if !partOffset.from.lastConsumedTime.IsZero() {
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: replica,
Offset: NewOffset().AfterMilli(partOffset.from.lastConsumedTime.UnixMilli()),
})
if log {
s.cl.cfg.logger.Log(LogLevelWarn, "received OFFSET_OUT_OF_RANGE, resetting to the nearest offset; either you were consuming too slowly and the broker has deleted the segment you were in the middle of consuming, or the broker has lost data and has not yet transferred leadership",
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"prior_offset", partOffset.offset,
)
}
} else {
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: replica,
Offset: s.cl.cfg.resetOffset,
})
if log {
s.cl.cfg.logger.Log(LogLevelInfo, "received OFFSET_OUT_OF_RANGE on the first fetch, resetting to the configured ConsumeResetOffset",
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"prior_offset", partOffset.offset,
)
}
}
}

switch {
case s.nodeID == partOffset.from.leader: // non KIP-392 case
addList(-1)
addList(-1, true)

case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
addList(s.nodeID)
addList(s.nodeID, false)

default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
if kip320 {
Expand All @@ -1098,7 +1125,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// If the broker does not support offset for leader epoch but
// does support follower fetching for some reason, we have to
// fallback to listing.
addList(-1)
addList(-1, true)
}
}

Expand Down Expand Up @@ -1630,6 +1657,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a
// topic is compacted.
o.offset = record.Offset + 1
o.lastConsumedEpoch = record.LeaderEpoch
o.lastConsumedTime = record.Timestamp
}

///////////////////////////////
Expand Down

0 comments on commit 3134cb2

Please sign in to comment.