diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 92ebeaa3..c7bf6963 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -1113,8 +1113,8 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt { // For Kafka-to-Kafka transactions, the transactional ID is only one half of // the equation. You must also assign a group to consume from. // -// To produce transactionally, you first BeginTransaction, then produce records -// consumed from a group, then you EndTransaction. All records produced outside +// To produce transactionally, you first [BeginTransaction], then produce records +// consumed from a group, then you [EndTransaction]. All records produced outside // of a transaction will fail immediately with an error. // // After producing a batch, you must commit what you consumed. Auto committing @@ -1449,7 +1449,7 @@ func Balancers(balancers ...GroupBalancer) GroupOpt { // in this timeout, the broker will remove the member from the group and // initiate a rebalance. // -// If you are using a GroupTransactSession for EOS, wish to lower this, and are +// If you are using a [GroupTransactSession] for EOS, wish to lower this, and are // talking to a Kafka cluster pre 2.5, consider lowering the // TransactionTimeout. If you do not, you risk a transaction finishing after a // group has rebalanced, which could lead to duplicate processing. If you are diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 33cac641..515b6c63 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -504,6 +504,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti failing: mp.loadErr != 0, sink: mp.sns.sink, topicPartitionData: td, + lastAckedOffset: -1, } } else { p.cursor = &cursor{ diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index fa66e304..2b96d916 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -859,6 +859,20 @@ func (s *sink) handleReqRespBatch( // handling, but KIP-360 demonstrated that resetting sequence // numbers is fundamentally unsafe, so we treat it like OOOSN. // + // KAFKA-5793 specifically mentions for OOOSN "when you get it, + // it should always mean data loss". Sometime after KIP-360, + // Kafka changed the client to remove all places + // UnknownProducerID was returned, and then started referring + // to OOOSN as retryable. KIP-890 definitively says OOOSN is + // retryable. However, the Kafka source as of 24-10-10 still + // only retries OOOSN for batches that are NOT the expected + // next batch (i.e., it's next + 1, for when there are multiple + // in flight). With KIP-890, we still just disregard whatever + // supposedly non-retryable / actually-is-retryable error is + // returned if the LogStartOffset is _after_ what we previously + // produced. Specifically, this is step (4) in in wiki link + // within KAFKA-5793. + // // InvalidMapping is similar to UnknownProducerID, but occurs // when the txnal coordinator timed out our transaction. // @@ -886,6 +900,22 @@ func (s *sink) handleReqRespBatch( // txn coordinator requests, which have PRODUCER_FENCED vs // TRANSACTION_TIMED_OUT. + if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset { + s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers", + "broker", logID(s.nodeID), + "topic", topic, + "partition", rp.Partition, + "producer_id", producerID, + "producer_epoch", producerEpoch, + "err", err, + ) + s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID) + if debug { + fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err) + } + return true, false + } + if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss { s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID", "broker", logID(s.nodeID), @@ -956,6 +986,7 @@ func (s *sink) handleReqRespBatch( ) } else { batch.owner.okOnSink = true + batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records)) } s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err) didProduce = err == nil @@ -1227,6 +1258,8 @@ type recBuf struct { // to drain. inflight uint8 + lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced + topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated) // seq is used for the seq in each record batch. It is incremented when