Skip to content

Commit

Permalink
Merge pull request #832 from twmb/831
Browse files Browse the repository at this point in the history
kgo: fix potential deadlock when reaching max buffered (records|bytes)
  • Loading branch information
twmb authored Oct 10, 2024
2 parents 1fdc835 + 0de933b commit b66ceb7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ linters:
- asciicheck
- bidichk
- bodyclose
- copyloopvar
- durationcheck
- exhaustive
- exportloopref
- gocritic
- gofmt
- gofumpt
Expand Down Expand Up @@ -74,6 +74,7 @@ linters-settings:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
- G404 # we want math/rand
- G115 # irrelevant flags in this repo

# Gocritic is a meta linter that has very good lints, and most of the
# experimental ones are very good too. We opt into everything, which helps
Expand Down
6 changes: 3 additions & 3 deletions generate/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (s Struct) WriteDefault(l *LineWriter) {

func (s Struct) WriteDefn(l *LineWriter) {
if s.Comment != "" {
l.Write(s.Comment)
l.Write(s.Comment) //nolint:govet // ...
}
l.Write("type %s struct {", s.Name)
if s.TopLevel {
Expand Down Expand Up @@ -822,15 +822,15 @@ func (s Struct) WriteNewPtrFunc(l *LineWriter) {

func (e Enum) WriteDefn(l *LineWriter) {
if e.Comment != "" {
l.Write(e.Comment)
l.Write(e.Comment) //nolint:govet // ...
l.Write("// ")
}
l.Write("// Possible values and their meanings:")
l.Write("// ")
for _, v := range e.Values {
l.Write("// * %d (%s)", v.Value, v.Word)
if len(v.Comment) > 0 {
l.Write(v.Comment)
l.Write(v.Comment) //nolint:govet // ...
}
l.Write("//")
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,43 @@ func TestIssue769(t *testing.T) {
}
}

func TestIssue831(t *testing.T) {
t.Parallel()

topic, cleanup := tmpTopic(t)
defer cleanup()

cl, _ := newTestClient(
DefaultProduceTopic(topic),
UnknownTopicRetries(-1),
MaxBufferedRecords(1),
)
defer cl.Close()

var wg sync.WaitGroup
for i := 0; i < 500; i++ {
ctx, cancel := context.WithCancel(context.Background())
cancel()

wg.Add(1)
go cl.Produce(ctx, &Record{Value: []byte("foo")}, func(*Record, error) {
wg.Done()
})
}

done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()

select {
case <-done:
case <-time.After(15 * time.Second):
t.Fatal("still trying to produce after delay")
}
}

// This file contains golden tests against kmsg AppendTo's to ensure our custom
// encoding is correct.

Expand Down
22 changes: 17 additions & 5 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,24 @@ func (cl *Client) produce(
}()

drainBuffered := func(err error) {
p.mu.Lock()
quit = true
// The expected case here is that a context was
// canceled while we we waiting for space, so we are
// exiting and need to kill the goro above.
//
// However, it is possible that the goro above has
// already exited AND the context was canceled, and
// `select` chose the context-canceled case.
//
// So, to avoid a deadlock, we need to wakeup the
// goro above in another goroutine.
go func() {
p.mu.Lock()
quit = true
p.mu.Unlock()
p.c.Broadcast()
}()
<-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
p.mu.Unlock()
p.c.Broadcast() // wake the goroutine above
<-wait
p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ type FetchError struct {
// client for.
//
// 3. an untyped batch parse failure; these are usually unrecoverable by
// restarts, and it may be best to just let the client continue. However,
// restarting is an option, but you may need to manually repair your
// restarts, and it may be best to just let the client continue.
// Restarting is an option, but you may need to manually repair your
// partition.
//
// 4. an injected ErrClientClosed; this is a fatal informational error that
Expand Down

0 comments on commit b66ceb7

Please sign in to comment.