From 58d20a13ecf50bb5352d54f10b152cabd50bdb91 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 20 Jul 2024 18:52:36 -0600 Subject: [PATCH] producer: fully account that we are unblocked The select has three cases; two use drainBuffered which properly added -1 to the blocked count. The first case didn't. We now do. Practically, this meant that if you ever were blocked while lingering, you would never linger again. For #726. --- pkg/kgo/producer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index cfc6aa14..f09b4712 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -448,6 +448,7 @@ func (cl *Client) produce( select { case <-p.waitBuffer: cl.cfg.logger.Log(LogLevelDebug, "Produce block signaled, continuing to produce") + cl.producer.blocked.Add(-1) case <-cl.ctx.Done(): drainBuffered(ErrClientClosed) cl.cfg.logger.Log(LogLevelDebug, "client ctx canceled while blocked in Produce, returning")