diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 49bff3b1..d7db8cff 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -942,6 +942,24 @@ func (s *sink) handleRetryBatches( return } + // If the request failed due to a concurrent metadata update + // moving partitions to a different sink (or killing the sink + // this partition was on), we can just reset the drain index + // and trigger draining now the new sink. There is no reason + // to backoff on this sink nor trigger a metadata update. + if batch.owner.sink != s { + if debug { + logger.Log(LogLevelDebug, "transitioned sinks while a request was inflight, retrying immediately on new sink without backoff", + "topic", batch.owner.topic, + "partition", batch.owner.partition, + "old_sink", s.nodeID, + "new_sink", batch.owner.sink.nodeID, + ) + } + batch.owner.resetBatchDrainIdx() + return + } + if canFail || s.cl.cfg.disableIdempotency { if err := batch.maybeFailErr(&s.cl.cfg); err != nil { batch.owner.failAllRecords(err) @@ -1003,6 +1021,8 @@ func (s *sink) handleRetryBatches( // If neither of these cases are true, then we entered wanting a // metadata update, but the batches either were not the first batch, or // the batches were concurrently failed. + // + // If all partitions are moving, we do not need to backoff nor drain. if shouldBackoff || (!updateMeta && numRetryBatches != numMoveBatches) { s.maybeTriggerBackoff(backoffSeq) s.maybeDrain()