diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index f7b29f62..cfc6aa14 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -46,6 +46,7 @@ type producer struct { // field on recBufs that is toggled on flush. If we did, then a new // recBuf could be created and records sent to while we are flushing. flushing atomicI32 // >0 if flushing, can Flush many times concurrently + blocked atomicI32 // >0 if over max recs or bytes aborting atomicI32 // >0 if aborting, can abort many times concurrently @@ -425,6 +426,12 @@ func (cl *Client) produce( "over_max_records", overMaxRecs, "over_max_bytes", overMaxBytes, ) + // Before we potentially unlinger, add that we are blocked. + // Lingering always checks blocked, so we will not start a + // linger while we are blocked. We THEN wakeup anything that + // is actively lingering. + cl.producer.blocked.Add(1) + cl.unlingerDueToMaxRecsBuffered() // If the client ctx cancels or the produce ctx cancels, we // need to un-count our buffering of this record. We also need // to drain a slot from the waitBuffer chan, which could be @@ -432,6 +439,7 @@ func (cl *Client) produce( drainBuffered := func(err error) { p.promiseRecord(promisedRec{ctx, promise, r}, err) <-p.waitBuffer + cl.producer.blocked.Add(-1) } if !block || cl.cfg.manualFlushing { drainBuffered(ErrMaxBuffered) @@ -963,6 +971,18 @@ func (cl *Client) waitUnknownTopic( }) } +func (cl *Client) unlingerDueToMaxRecsBuffered() { + if cl.cfg.linger <= 0 { + return + } + for _, parts := range cl.producer.topics.load() { + for _, part := range parts.load().partitions { + part.records.unlingerAndManuallyDrain() + } + } + cl.cfg.logger.Log(LogLevelDebug, "unlingered all partitions due to hitting max buffered") +} + // Flush hangs waiting for all buffered records to be flushed, stopping all // lingers if necessary. // diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index d7941712..49bff3b1 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1257,7 +1257,7 @@ func (recBuf *recBuf) tryStopLingerForDraining() bool { // Begins a linger timer unless the producer is being flushed. func (recBuf *recBuf) lockedMaybeStartLinger() bool { - if recBuf.cl.producer.flushing.Load() > 0 { + if recBuf.cl.producer.flushing.Load() > 0 || recBuf.cl.producer.blocked.Load() > 0 { return false } recBuf.lingering = time.AfterFunc(recBuf.cl.cfg.linger, recBuf.sink.maybeDrain)