diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index b99a3c5c..76ba397b 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -636,7 +636,10 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) { } // SetOffsets sets any matching offsets in setOffsets to the given -// epoch/offset. Partitions that are not specified are not set. +// epoch/offset. Partitions that are not specified are not set. It is invalid +// to set topics that were not yet returned from a PollFetches: this function +// sets only partitions that were previously consumed, any extra partitions are +// skipped. // // If directly consuming, this function operates as expected given the caveats // of the prior paragraph. @@ -644,11 +647,11 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) { // If using transactions, it is advised to just use a GroupTransactSession and // avoid this function entirely. // -// If using group consuming, it is strongly recommended to use this function +// If using group consuming, It is strongly recommended to use this function // outside of the context of a PollFetches loop and only when you know the // group is not revoked (i.e., block any concurrent revoke while issuing this // call) and to not use this concurrent with committing. Any other usage is -// prone to odd interactions around rebalancing. +// prone to odd interactions. func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) { cl.setOffsets(setOffsets, true) } @@ -658,12 +661,6 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo return } - topics := make([]string, 0, len(setOffsets)) - for topic := range setOffsets { - topics = append(topics, topic) - } - cl.AddConsumeTopics(topics...) - // We assignPartitions before returning, so we grab the consumer lock // first to preserve consumer mu => group mu ordering, or to ensure // no concurrent metadata assign for direct consuming. @@ -750,18 +747,15 @@ func (cl *Client) AddConsumeTopics(topics ...string) { c.mu.Lock() defer c.mu.Unlock() - var added bool if c.g != nil { - added = c.g.tps.storeTopics(topics) + c.g.tps.storeTopics(topics) } else { - added = c.d.tps.storeTopics(topics) + c.d.tps.storeTopics(topics) for _, topic := range topics { - added = c.d.m.addt(topic) || added + c.d.m.addt(topic) } } - if added { - cl.triggerUpdateMetadataNow("from AddConsumeTopics") - } + cl.triggerUpdateMetadataNow("from AddConsumeTopics") } // AddConsumePartitions adds new partitions to be consumed at the given diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index b5c33e2b..f02b0d69 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "os" "sort" "sync/atomic" "testing" @@ -471,90 +470,6 @@ func TestIssue523(t *testing.T) { } } -func TestSetOffsetsForNewTopic(t *testing.T) { - t.Parallel() - t1, tcleanup := tmpTopicPartitions(t, 1) - defer tcleanup() - - { - cl, _ := newTestClient( - DefaultProduceTopic(t1), - MetadataMinAge(100*time.Millisecond), - FetchMaxWait(time.Second), - UnknownTopicRetries(-1), - ) - defer cl.Close() - - if err := cl.ProduceSync(context.Background(), StringRecord("foo"), StringRecord("bar")).FirstErr(); err != nil { - t.Fatal(err) - } - cl.Close() - } - - { - cl, _ := newTestClient( - MetadataMinAge(100*time.Millisecond), - FetchMaxWait(time.Second), - WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)), - ) - defer cl.Close() - - cl.SetOffsets(map[string]map[int32]EpochOffset{ - t1: {0: EpochOffset{Epoch: -1, Offset: 1}}, - }) - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) - fs := cl.PollFetches(ctx) - cancel() - if errors.Is(fs.Err0(), context.DeadlineExceeded) { - t.Errorf("failed waiting for record") - return - } - if fs.NumRecords() != 1 { - t.Errorf("saw %d records, expected 1", fs.NumRecords()) - return - } - if string(fs.Records()[0].Value) != "bar" { - t.Errorf("incorrect record consumed") - return - } - cl.Close() - } - - // Duplicate above, but with a group. - { - g1, gcleanup := tmpGroup(t) - defer gcleanup() - - cl, _ := newTestClient( - MetadataMinAge(100*time.Millisecond), - FetchMaxWait(time.Second), - ConsumerGroup(g1), - WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)), - ) - defer cl.Close() - - cl.SetOffsets(map[string]map[int32]EpochOffset{ - t1: {0: EpochOffset{Epoch: -1, Offset: 1}}, - }) - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) - fs := cl.PollFetches(ctx) - cancel() - if errors.Is(fs.Err0(), context.DeadlineExceeded) { - t.Errorf("failed waiting for record") - return - } - if fs.NumRecords() != 1 { - t.Errorf("saw %d records, expected 1", fs.NumRecords()) - return - } - if string(fs.Records()[0].Value) != "bar" { - t.Errorf("incorrect record consumed") - return - } - cl.Close() - } -} - func TestIssue648(t *testing.T) { t.Parallel() cl, _ := newTestClient( diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 71b7de07..b3bf6bf0 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -350,7 +350,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { for topic := range latest { allTopics = append(allTopics, topic) } - tpsConsumerLoad, _ = tpsConsumer.ensureTopics(allTopics) + tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics) defer tpsConsumer.storeData(tpsConsumerLoad) // For regex consuming, if a topic is not returned in the diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 56af5389..91b40c10 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -89,7 +89,7 @@ func (m *mtmps) add(t string, p int32) { mps[p] = struct{}{} } -func (m *mtmps) addt(t string) bool { +func (m *mtmps) addt(t string) { if *m == nil { *m = make(mtmps) } @@ -97,9 +97,7 @@ func (m *mtmps) addt(t string) bool { if mps == nil { mps = make(map[int32]struct{}) (*m)[t] = mps - return true } - return false } func (m mtmps) onlyt(t string) bool { @@ -292,12 +290,7 @@ func (t *topicsPartitions) load() topicsPartitionsData { return t.v.Load().(topicsPartitionsData) } func (t *topicsPartitions) storeData(d topicsPartitionsData) { t.v.Store(d) } -func (t *topicsPartitions) storeTopics(topics []string) bool { - v, added := t.ensureTopics(topics) - t.v.Store(v) - return added -} - +func (t *topicsPartitions) storeTopics(topics []string) { t.v.Store(t.ensureTopics(topics)) } func (t *topicsPartitions) clone() topicsPartitionsData { current := t.load() clone := make(map[string]*topicPartitions, len(current)) @@ -310,7 +303,7 @@ func (t *topicsPartitions) clone() topicsPartitionsData { // Ensures that the topics exist in the returned map, but does not store the // update. This can be used to update the data and store later, rather than // storing immediately. -func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData, bool) { +func (t *topicsPartitions) ensureTopics(topics []string) topicsPartitionsData { var cloned bool current := t.load() for _, topic := range topics { @@ -322,7 +315,7 @@ func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData, current[topic] = newTopicPartitions() } } - return current, cloned + return current } // Opposite of ensureTopics, this purges the input topics and *does* store.