diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index cfc6aa14..f09b4712 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -448,6 +448,7 @@ func (cl *Client) produce( select { case <-p.waitBuffer: cl.cfg.logger.Log(LogLevelDebug, "Produce block signaled, continuing to produce") + cl.producer.blocked.Add(-1) case <-cl.ctx.Done(): drainBuffered(ErrClientClosed) cl.cfg.logger.Log(LogLevelDebug, "client ctx canceled while blocked in Produce, returning")