diff --git a/.golangci.yml b/.golangci.yml index 2895cca6..193366c2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,9 +24,9 @@ linters: - asciicheck - bidichk - bodyclose + - copyloopvar - durationcheck - exhaustive - - exportloopref - gocritic - gofmt - gofumpt @@ -74,6 +74,7 @@ linters-settings: excludes: - G104 # unhandled errors, we exclude for the same reason we do not use errcheck - G404 # we want math/rand + - G115 # irrelevant flags in this repo # Gocritic is a meta linter that has very good lints, and most of the # experimental ones are very good too. We opt into everything, which helps diff --git a/generate/gen.go b/generate/gen.go index fec90d1c..4f7cf8e3 100644 --- a/generate/gen.go +++ b/generate/gen.go @@ -625,7 +625,7 @@ func (s Struct) WriteDefault(l *LineWriter) { func (s Struct) WriteDefn(l *LineWriter) { if s.Comment != "" { - l.Write(s.Comment) + l.Write(s.Comment) //nolint:govet // ... } l.Write("type %s struct {", s.Name) if s.TopLevel { @@ -822,7 +822,7 @@ func (s Struct) WriteNewPtrFunc(l *LineWriter) { func (e Enum) WriteDefn(l *LineWriter) { if e.Comment != "" { - l.Write(e.Comment) + l.Write(e.Comment) //nolint:govet // ... l.Write("// ") } l.Write("// Possible values and their meanings:") @@ -830,7 +830,7 @@ func (e Enum) WriteDefn(l *LineWriter) { for _, v := range e.Values { l.Write("// * %d (%s)", v.Value, v.Word) if len(v.Comment) > 0 { - l.Write(v.Comment) + l.Write(v.Comment) //nolint:govet // ... } l.Write("//") } diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index 1270354a..220279a7 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -195,6 +195,43 @@ func TestIssue769(t *testing.T) { } } +func TestIssue831(t *testing.T) { + t.Parallel() + + topic, cleanup := tmpTopic(t) + defer cleanup() + + cl, _ := newTestClient( + DefaultProduceTopic(topic), + UnknownTopicRetries(-1), + MaxBufferedRecords(1), + ) + defer cl.Close() + + var wg sync.WaitGroup + for i := 0; i < 500; i++ { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + wg.Add(1) + go cl.Produce(ctx, &Record{Value: []byte("foo")}, func(*Record, error) { + wg.Done() + }) + } + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-done: + case <-time.After(15 * time.Second): + t.Fatal("still trying to produce after delay") + } +} + // This file contains golden tests against kmsg AppendTo's to ensure our custom // encoding is correct. diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index d9cca992..1ea2a29f 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -480,12 +480,24 @@ func (cl *Client) produce( }() drainBuffered := func(err error) { - p.mu.Lock() - quit = true + // The expected case here is that a context was + // canceled while we we waiting for space, so we are + // exiting and need to kill the goro above. + // + // However, it is possible that the goro above has + // already exited AND the context was canceled, and + // `select` chose the context-canceled case. + // + // So, to avoid a deadlock, we need to wakeup the + // goro above in another goroutine. + go func() { + p.mu.Lock() + quit = true + p.mu.Unlock() + p.c.Broadcast() + }() + <-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.mu.Unlock() - p.c.Broadcast() // wake the goroutine above - <-wait - p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked) p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err) } diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 4f1ebe6f..30a6a33a 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -345,8 +345,8 @@ type FetchError struct { // client for. // // 3. an untyped batch parse failure; these are usually unrecoverable by -// restarts, and it may be best to just let the client continue. However, -// restarting is an option, but you may need to manually repair your +// restarts, and it may be best to just let the client continue. +// Restarting is an option, but you may need to manually repair your // partition. // // 4. an injected ErrClientClosed; this is a fatal informational error that