From e62b402edded08a074145af45c784e8f498495dd Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 9 Jun 2024 19:58:24 -0600 Subject: [PATCH] kgo sink: do not back off on certain edge case * Produce request created and about to be issued * Metadata request resolves and removes the broker that was about to be sent to, updates leadership for the partition * recBuf's `sink` field is updated * The old sink then enters handleReqResp, then eventually handleRetryBatches Previously, * Failed partition triggers a metadata refresh and enters a failed state until the metadata refresh clears the failing state. Because a metadata refresh JUST happened, internally this causes a 5s wait by default Now, * Failed partition notices that it is actually NOW on a different broker than the broker that is handling the failure, and does not back off at all, and actually triggers potentially draining on the new sink once decInflight runs Closes #746. --- pkg/kgo/sink.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 49bff3b1..d7db8cff 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -942,6 +942,24 @@ func (s *sink) handleRetryBatches( return } + // If the request failed due to a concurrent metadata update + // moving partitions to a different sink (or killing the sink + // this partition was on), we can just reset the drain index + // and trigger draining now the new sink. There is no reason + // to backoff on this sink nor trigger a metadata update. + if batch.owner.sink != s { + if debug { + logger.Log(LogLevelDebug, "transitioned sinks while a request was inflight, retrying immediately on new sink without backoff", + "topic", batch.owner.topic, + "partition", batch.owner.partition, + "old_sink", s.nodeID, + "new_sink", batch.owner.sink.nodeID, + ) + } + batch.owner.resetBatchDrainIdx() + return + } + if canFail || s.cl.cfg.disableIdempotency { if err := batch.maybeFailErr(&s.cl.cfg); err != nil { batch.owner.failAllRecords(err) @@ -1003,6 +1021,8 @@ func (s *sink) handleRetryBatches( // If neither of these cases are true, then we entered wanting a // metadata update, but the batches either were not the first batch, or // the batches were concurrently failed. + // + // If all partitions are moving, we do not need to backoff nor drain. if shouldBackoff || (!updateMeta && numRetryBatches != numMoveBatches) { s.maybeTriggerBackoff(backoffSeq) s.maybeDrain()