From db24bbf327d5cdddb727a57760c24b87f06c898a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 23 May 2024 00:24:25 -0600 Subject: [PATCH] kgo: avoid / wakeup lingering if we hit max bytes or max records Currently if your linger is long and your max records small, it is possible to linger even if the client cannot buffer any more records. Now, once max records or bytes is hit, we wakeup anything lingering and avoid entering the linger state again -- until we are no longer over max. Closes #726. --- pkg/kgo/producer.go | 20 ++++++++++++++++++++ pkg/kgo/sink.go | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index f7b29f62..cfc6aa14 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -46,6 +46,7 @@ type producer struct { // field on recBufs that is toggled on flush. If we did, then a new // recBuf could be created and records sent to while we are flushing. flushing atomicI32 // >0 if flushing, can Flush many times concurrently + blocked atomicI32 // >0 if over max recs or bytes aborting atomicI32 // >0 if aborting, can abort many times concurrently @@ -425,6 +426,12 @@ func (cl *Client) produce( "over_max_records", overMaxRecs, "over_max_bytes", overMaxBytes, ) + // Before we potentially unlinger, add that we are blocked. + // Lingering always checks blocked, so we will not start a + // linger while we are blocked. We THEN wakeup anything that + // is actively lingering. + cl.producer.blocked.Add(1) + cl.unlingerDueToMaxRecsBuffered() // If the client ctx cancels or the produce ctx cancels, we // need to un-count our buffering of this record. We also need // to drain a slot from the waitBuffer chan, which could be @@ -432,6 +439,7 @@ func (cl *Client) produce( drainBuffered := func(err error) { p.promiseRecord(promisedRec{ctx, promise, r}, err) <-p.waitBuffer + cl.producer.blocked.Add(-1) } if !block || cl.cfg.manualFlushing { drainBuffered(ErrMaxBuffered) @@ -963,6 +971,18 @@ func (cl *Client) waitUnknownTopic( }) } +func (cl *Client) unlingerDueToMaxRecsBuffered() { + if cl.cfg.linger <= 0 { + return + } + for _, parts := range cl.producer.topics.load() { + for _, part := range parts.load().partitions { + part.records.unlingerAndManuallyDrain() + } + } + cl.cfg.logger.Log(LogLevelDebug, "unlingered all partitions due to hitting max buffered") +} + // Flush hangs waiting for all buffered records to be flushed, stopping all // lingers if necessary. // diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 0f23b067..fa21bd43 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1229,7 +1229,7 @@ func (recBuf *recBuf) tryStopLingerForDraining() bool { // Begins a linger timer unless the producer is being flushed. func (recBuf *recBuf) lockedMaybeStartLinger() bool { - if recBuf.cl.producer.flushing.Load() > 0 { + if recBuf.cl.producer.flushing.Load() > 0 || recBuf.cl.producer.blocked.Load() > 0 { return false } recBuf.lingering = time.AfterFunc(recBuf.cl.cfg.linger, recBuf.sink.maybeDrain)