From 3548d1f7ab535037612707999bbfa73f1e2672f2 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Oct 2024 17:34:55 -0600 Subject: [PATCH] kgo: ignore OOOSN where possible See embedded comment. This preceeds handling KIP-890. Closes #805. --- pkg/kgo/metadata.go | 1 + pkg/kgo/sink.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 33cac641..515b6c63 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -504,6 +504,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti failing: mp.loadErr != 0, sink: mp.sns.sink, topicPartitionData: td, + lastAckedOffset: -1, } } else { p.cursor = &cursor{ diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6d0f3dfe..853fb0d2 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -854,6 +854,20 @@ func (s *sink) handleReqRespBatch( // handling, but KIP-360 demonstrated that resetting sequence // numbers is fundamentally unsafe, so we treat it like OOOSN. // + // KAFKA-5793 specifically mentions for OOOSN "when you get it, + // it should always mean data loss". Sometime after KIP-360, + // Kafka changed the client to remove all places + // UnknownProducerID was returned, and then started referring + // to OOOSN as retryable. KIP-890 definitively says OOOSN is + // retryable. However, the Kafka source as of 24-10-10 still + // only retries OOOSN for batches that are NOT the expected + // next batch (i.e., it's next + 1, for when there are multiple + // in flight). With KIP-890, we still just disregard whatever + // supposedly non-retryable / actually-is-retryable error is + // returned if the LogStartOffset is _after_ what we previously + // produced. Specifically, this is step (4) in in wiki link + // within KAFKA-5793. + // // InvalidMapping is similar to UnknownProducerID, but occurs // when the txnal coordinator timed out our transaction. // @@ -881,6 +895,22 @@ func (s *sink) handleReqRespBatch( // txn coordinator requests, which have PRODUCER_FENCED vs // TRANSACTION_TIMED_OUT. + if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset { + s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers", + "broker", logID(s.nodeID), + "topic", topic, + "partition", rp.Partition, + "producer_id", producerID, + "producer_epoch", producerEpoch, + "err", err, + ) + s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID) + if debug { + fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err) + } + return true, false + } + if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", "broker", logID(s.nodeID), @@ -951,6 +981,7 @@ func (s *sink) handleReqRespBatch( ) } else { batch.owner.okOnSink = true + batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records)) } s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err) didProduce = err == nil @@ -1222,6 +1253,8 @@ type recBuf struct { // to drain. inflight uint8 + lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced + topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated) // seq is used for the seq in each record batch. It is incremented when