Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: ignore OOOSN where possible #834

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,8 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt {
// For Kafka-to-Kafka transactions, the transactional ID is only one half of
// the equation. You must also assign a group to consume from.
//
// To produce transactionally, you first BeginTransaction, then produce records
// consumed from a group, then you EndTransaction. All records produced outside
// To produce transactionally, you first [BeginTransaction], then produce records
// consumed from a group, then you [EndTransaction]. All records produced outside
// of a transaction will fail immediately with an error.
//
// After producing a batch, you must commit what you consumed. Auto committing
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func Balancers(balancers ...GroupBalancer) GroupOpt {
// in this timeout, the broker will remove the member from the group and
// initiate a rebalance.
//
// If you are using a GroupTransactSession for EOS, wish to lower this, and are
// If you are using a [GroupTransactSession] for EOS, wish to lower this, and are
// talking to a Kafka cluster pre 2.5, consider lowering the
// TransactionTimeout. If you do not, you risk a transaction finishing after a
// group has rebalanced, which could lead to duplicate processing. If you are
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti
failing: mp.loadErr != 0,
sink: mp.sns.sink,
topicPartitionData: td,
lastAckedOffset: -1,
}
} else {
p.cursor = &cursor{
Expand Down
33 changes: 33 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,20 @@ func (s *sink) handleReqRespBatch(
// handling, but KIP-360 demonstrated that resetting sequence
// numbers is fundamentally unsafe, so we treat it like OOOSN.
//
// KAFKA-5793 specifically mentions for OOOSN "when you get it,
// it should always mean data loss". Sometime after KIP-360,
// Kafka changed the client to remove all places
// UnknownProducerID was returned, and then started referring
// to OOOSN as retryable. KIP-890 definitively says OOOSN is
// retryable. However, the Kafka source as of 24-10-10 still
// only retries OOOSN for batches that are NOT the expected
// next batch (i.e., it's next + 1, for when there are multiple
// in flight). With KIP-890, we still just disregard whatever
// supposedly non-retryable / actually-is-retryable error is
// returned if the LogStartOffset is _after_ what we previously
// produced. Specifically, this is step (4) in in wiki link
// within KAFKA-5793.
//
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
Expand Down Expand Up @@ -881,6 +895,22 @@ func (s *sink) handleReqRespBatch(
// txn coordinator requests, which have PRODUCER_FENCED vs
// TRANSACTION_TIMED_OUT.

if batch.owner.lastAckedOffset >= 0 && rp.LogStartOffset > batch.owner.lastAckedOffset {
s.cl.cfg.logger.Log(LogLevelInfo, "partition prefix truncation to after our last produce caused the broker to forget us; no loss occurred, bumping producer epoch and resetting sequence numbers",
"broker", logID(s.nodeID),
"topic", topic,
"partition", rp.Partition,
"producer_id", producerID,
"producer_epoch", producerEpoch,
"err", err,
)
s.cl.failProducerID(producerID, producerEpoch, errReloadProducerID)
if debug {
fmt.Fprintf(b, "resetting@%d,%d(%s)}, ", rp.BaseOffset, nrec, err)
}
return true, false
}

if s.cl.cfg.txnID != nil || s.cl.cfg.stopOnDataLoss {
s.cl.cfg.logger.Log(LogLevelInfo, "batch errored, failing the producer ID",
"broker", logID(s.nodeID),
Expand Down Expand Up @@ -951,6 +981,7 @@ func (s *sink) handleReqRespBatch(
)
} else {
batch.owner.okOnSink = true
batch.owner.lastAckedOffset = rp.BaseOffset + int64(len(batch.records))
}
s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, rp.Partition, rp.BaseOffset, err)
didProduce = err == nil
Expand Down Expand Up @@ -1222,6 +1253,8 @@ type recBuf struct {
// to drain.
inflight uint8

lastAckedOffset int64 // last ProduceResponse's BaseOffset + how many records we produced

topicPartitionData // updated in metadata migrateProductionTo (same spot sink is updated)

// seq is used for the seq in each record batch. It is incremented when
Expand Down