From 7eae3c3f40eb919902b08a96a09e3bd338c870e4 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 9 Oct 2024 18:21:49 -0600 Subject: [PATCH] 831 --- pkg/kgo/produce_request_test.go | 41 +++++++++++++++++++++++++++++++++ pkg/kgo/producer.go | 22 ++++++++++++++---- pkg/kgo/record_and_fetch.go | 4 ++-- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index 1270354a..2b04e3f8 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -195,6 +195,47 @@ func TestIssue769(t *testing.T) { } } +func TestIssue831(t *testing.T) { + t.Parallel() + + topic, cleanup := tmpTopic(t) + defer cleanup() + + cl, _ := newTestClient( + DefaultProduceTopic(topic), + UnknownTopicRetries(-1), + MaxBufferedRecords(1), + ) + defer cl.Close() + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + ctx := context.Background() + var cancel func() + if i%2 == 0 { + ctx, cancel = context.WithCancel(ctx) + cancel() + } + + wg.Add(1) + cl.Produce(ctx, &Record{Value: []byte("foo")}, func(*Record, error) { + wg.Done() + }) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-done: + case <-time.After(15 * time.Second): + t.Fatal("still trying to produce after delay") + } +} + // This file contains golden tests against kmsg AppendTo's to ensure our custom // encoding is correct. diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index d9cca992..1ea2a29f 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -480,12 +480,24 @@ func (cl *Client) produce( }() drainBuffered := func(err error) { - p.mu.Lock() - quit = true + // The expected case here is that a context was + // canceled while we we waiting for space, so we are + // exiting and need to kill the goro above. + // + // However, it is possible that the goro above has + // already exited AND the context was canceled, and + // `select` chose the context-canceled case. + // + // So, to avoid a deadlock, we need to wakeup the + // goro above in another goroutine. + go func() { + p.mu.Lock() + quit = true + p.mu.Unlock() + p.c.Broadcast() + }() + <-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.mu.Unlock() - p.c.Broadcast() // wake the goroutine above - <-wait - p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err) } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 4f1ebe6f..30a6a33a 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -345,8 +345,8 @@ type FetchError struct { // client for. // // 3. an untyped batch parse failure; these are usually unrecoverable by -// restarts, and it may be best to just let the client continue. However, -// restarting is an option, but you may need to manually repair your +// restarts, and it may be best to just let the client continue. +// Restarting is an option, but you may need to manually repair your // partition. // // 4. an injected ErrClientClosed; this is a fatal informational error that