From 9ff20d29ededdfddd7968b3462ae3ef69d900115 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 14 Jul 2024 19:53:12 -0700 Subject: [PATCH] kgo: add failure for 769 --- pkg/kgo/produce_request_test.go | 126 ++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/pkg/kgo/produce_request_test.go b/pkg/kgo/produce_request_test.go index 8a652a2f..66e06177 100644 --- a/pkg/kgo/produce_request_test.go +++ b/pkg/kgo/produce_request_test.go @@ -2,13 +2,139 @@ package kgo import ( "bytes" + "context" + "errors" "hash/crc32" "testing" + "time" "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kmsg" ) +// The produce below actually SUCCEEDS if the code for 769 is not working +// correctly. 769 is about a hanging produce not obeying a record cancelation, +// but we can simulate the same thing. +func TestIssue769(t *testing.T) { + t.Parallel() + + topic, cleanup := tmpTopic(t) + defer cleanup() + + cl, _ := newTestClient( + DefaultProduceTopic(topic), + UnknownTopicRetries(-1), + Dialer(new(slowDialer).DialContext), + ) + defer cl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + canceled := &Record{Value: []byte("foo"), Context: ctx} + okay := &Record{Value: []byte("foo")} + + // First check: ensure that an already-canceled record bails right + // away. This actually bails in the unknown-topic bit of logic, + // although there is no way to surface that to the end user. + { + done := make(chan struct{}) + var rerr error + cl.Produce(context.Background(), canceled, func(_ *Record, err2 error) { + defer close(done) + rerr = err2 + }) + timer := time.NewTimer(3 * time.Second) + select { + case <-done: + case <-timer.C: + t.Fatal("expected record to fail within 3s") + } + if !errors.Is(rerr, context.Canceled) { + t.Errorf("got %v != exp context.Canceled", rerr) + } + } + + // We have to produce one record successfully to ensure the topic is + // known, then we modify the guts of the client to forget the loaded + // producer ID. + { + done := make(chan struct{}) + var rerr error + cl.Produce(context.Background(), okay, func(_ *Record, err2 error) { + defer close(done) + rerr = err2 + }) + <-done + if rerr != nil { + t.Fatal("unexpected error on the first produce") + } + cl.producer.id.Store(&producerID{ + id: -1, + epoch: -1, + err: errReloadProducerID, + }) + } + + // With a loaded topic but forgotten producer ID, we now ensure that a + // canceled record fails in the producer ID portion. + { + done := make(chan struct{}) + var rerr error + cl.Produce(context.Background(), canceled, func(_ *Record, err2 error) { + defer close(done) + rerr = err2 + }) + timer := time.NewTimer(3 * time.Second) + select { + case <-done: + case <-timer.C: + t.Fatal("expected record to fail within 3s") + } + if pe := (*errProducerIDLoadFail)(nil); !errors.As(rerr, &pe) || !errors.Is(pe.err, context.Canceled) { + t.Errorf("got %v != exp errProducerIDLoadFail{context.Canceled}", rerr) + } + } + + // We now produce successfully again to ensure the next attempt fails + // after the producer ID stage. + { + done := make(chan struct{}) + var rerr error + cl.Produce(context.Background(), okay, func(_ *Record, err2 error) { + defer close(done) + rerr = err2 + }) + cl.Flush(context.Background()) + <-done + if rerr != nil { + t.Fatal("unexpected error on the first produce") + } + } + + // This fails before the produce request is issued, which is the furthest we + // can take the test. We do not use record context's in issued produce requests. + { + done := make(chan struct{}) + var rerr error + cl.Produce(context.Background(), canceled, func(_ *Record, err2 error) { + defer close(done) + rerr = err2 + }) + timer := time.NewTimer(3 * time.Second) + select { + case <-done: + case <-timer.C: + t.Fatal("expected record to fail within 3s") + } + if pe := (*errProducerIDLoadFail)(nil); errors.As(rerr, &pe) { + t.Error("unexpectedly got errProducerIDLoadFail") + } + if !errors.Is(rerr, context.Canceled) { + t.Errorf("got %v != context.Canceled", rerr) + } + } +} + // This file contains golden tests against kmsg AppendTo's to ensure our custom // encoding is correct.