diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 49c3d042..e1dd5477 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -152,6 +152,11 @@ type cursorOffset struct { // details. lastConsumedEpoch int32 + // If we receive OFFSET_OUT_OF_RANGE, and we previously *know* we + // consumed an offset, we reset to the nearest offset after our prior + // known valid consumed offset. + lastConsumedTime time.Time + // The current high watermark of the partition. Uninitialized (0) means // we do not know the HWM, or there is no lag. hwm int64 @@ -506,6 +511,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { pCursor.from.setOffset(cursorOffset{ offset: lastReturnedRecord.Offset + 1, lastConsumedEpoch: lastReturnedRecord.LeaderEpoch, + lastConsumedTime: lastReturnedRecord.Timestamp, hwm: p.HighWatermark, }) } @@ -1067,23 +1073,44 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // In all cases except case 4, we also have to check if // no reset offset was configured. If so, we ignore // trying to reset and instead keep our failed partition. - addList := func(replica int32) { + addList := func(replica int32, log bool) { if s.cl.cfg.resetOffset.noReset { keep = true + } else if !partOffset.from.lastConsumedTime.IsZero() { + reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ + replica: replica, + Offset: NewOffset().AfterMilli(partOffset.from.lastConsumedTime.UnixMilli()), + }) + if log { + s.cl.cfg.logger.Log(LogLevelWarn, "received OFFSET_OUT_OF_RANGE, resetting to the nearest offset; either you were consuming too slowly and the broker has deleted the segment you were in the middle of consuming, or the broker has lost data and has not yet transferred leadership", + "broker", logID(s.nodeID), + "topic", topic, + "partition", partition, + "prior_offset", partOffset.offset, + ) + } } else { reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{ replica: replica, Offset: s.cl.cfg.resetOffset, }) + if log { + s.cl.cfg.logger.Log(LogLevelInfo, "received OFFSET_OUT_OF_RANGE on the first fetch, resetting to the configured ConsumeResetOffset", + "broker", logID(s.nodeID), + "topic", topic, + "partition", partition, + "prior_offset", partOffset.offset, + ) + } } } switch { case s.nodeID == partOffset.from.leader: // non KIP-392 case - addList(-1) + addList(-1, true) case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3 - addList(s.nodeID) + addList(s.nodeID, false) default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4 if kip320 { @@ -1098,7 +1125,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // If the broker does not support offset for leader epoch but // does support follower fetching for some reason, we have to // fallback to listing. - addList(-1) + addList(-1, true) } } @@ -1630,6 +1657,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // topic is compacted. o.offset = record.Offset + 1 o.lastConsumedEpoch = record.LeaderEpoch + o.lastConsumedTime = record.Timestamp } ///////////////////////////////