Skip to content

Commit

Permalink
831
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Oct 10, 2024
1 parent b77dd13 commit 7eae3c3
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
41 changes: 41 additions & 0 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,47 @@ func TestIssue769(t *testing.T) {
}
}

func TestIssue831(t *testing.T) {
t.Parallel()

topic, cleanup := tmpTopic(t)
defer cleanup()

cl, _ := newTestClient(
DefaultProduceTopic(topic),
UnknownTopicRetries(-1),
MaxBufferedRecords(1),
)
defer cl.Close()

var wg sync.WaitGroup
for i := 0; i < 50; i++ {
ctx := context.Background()
var cancel func()
if i%2 == 0 {
ctx, cancel = context.WithCancel(ctx)
cancel()
}

wg.Add(1)
cl.Produce(ctx, &Record{Value: []byte("foo")}, func(*Record, error) {
wg.Done()
})
}

done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()

select {
case <-done:
case <-time.After(15 * time.Second):
t.Fatal("still trying to produce after delay")
}
}

// This file contains golden tests against kmsg AppendTo's to ensure our custom
// encoding is correct.

Expand Down
22 changes: 17 additions & 5 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,24 @@ func (cl *Client) produce(
}()

drainBuffered := func(err error) {
p.mu.Lock()
quit = true
// The expected case here is that a context was
// canceled while we we waiting for space, so we are
// exiting and need to kill the goro above.
//
// However, it is possible that the goro above has
// already exited AND the context was canceled, and
// `select` chose the context-canceled case.
//
// So, to avoid a deadlock, we need to wakeup the
// goro above in another goroutine.
go func() {
p.mu.Lock()
quit = true
p.mu.Unlock()
p.c.Broadcast()
}()
<-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
p.mu.Unlock()
p.c.Broadcast() // wake the goroutine above
<-wait
p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ type FetchError struct {
// client for.
//
// 3. an untyped batch parse failure; these are usually unrecoverable by
// restarts, and it may be best to just let the client continue. However,
// restarting is an option, but you may need to manually repair your
// restarts, and it may be best to just let the client continue.
// Restarting is an option, but you may need to manually repair your
// partition.
//
// 4. an injected ErrClientClosed; this is a fatal informational error that
Expand Down

0 comments on commit 7eae3c3

Please sign in to comment.