Skip to content

Commit

Permalink
Revert "pkg/kgo: remove SetOffsets restriction of "only previously po…
Browse files Browse the repository at this point in the history
…lled partitions""

This reverts commit d739f01.

Actually buggy. Far to difficult to get correct.
  • Loading branch information
twmb committed Jan 21, 2024
1 parent b45f393 commit 405d87d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 113 deletions.
26 changes: 10 additions & 16 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,19 +636,22 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
}

// SetOffsets sets any matching offsets in setOffsets to the given
// epoch/offset. Partitions that are not specified are not set.
// epoch/offset. Partitions that are not specified are not set. It is invalid
// to set topics that were not yet returned from a PollFetches: this function
// sets only partitions that were previously consumed, any extra partitions are
// skipped.
//
// If directly consuming, this function operates as expected given the caveats
// of the prior paragraph.
//
// If using transactions, it is advised to just use a GroupTransactSession and
// avoid this function entirely.
//
// If using group consuming, it is strongly recommended to use this function
// If using group consuming, It is strongly recommended to use this function
// outside of the context of a PollFetches loop and only when you know the
// group is not revoked (i.e., block any concurrent revoke while issuing this
// call) and to not use this concurrent with committing. Any other usage is
// prone to odd interactions around rebalancing.
// prone to odd interactions.
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
cl.setOffsets(setOffsets, true)
}
Expand All @@ -658,12 +661,6 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
return
}

topics := make([]string, 0, len(setOffsets))
for topic := range setOffsets {
topics = append(topics, topic)
}
cl.AddConsumeTopics(topics...)

// We assignPartitions before returning, so we grab the consumer lock
// first to preserve consumer mu => group mu ordering, or to ensure
// no concurrent metadata assign for direct consuming.
Expand Down Expand Up @@ -750,18 +747,15 @@ func (cl *Client) AddConsumeTopics(topics ...string) {
c.mu.Lock()
defer c.mu.Unlock()

var added bool
if c.g != nil {
added = c.g.tps.storeTopics(topics)
c.g.tps.storeTopics(topics)
} else {
added = c.d.tps.storeTopics(topics)
c.d.tps.storeTopics(topics)
for _, topic := range topics {
added = c.d.m.addt(topic) || added
c.d.m.addt(topic)
}
}
if added {
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}

// AddConsumePartitions adds new partitions to be consumed at the given
Expand Down
85 changes: 0 additions & 85 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -471,90 +470,6 @@ func TestIssue523(t *testing.T) {
}
}

func TestSetOffsetsForNewTopic(t *testing.T) {
t.Parallel()
t1, tcleanup := tmpTopicPartitions(t, 1)
defer tcleanup()

{
cl, _ := newTestClient(
DefaultProduceTopic(t1),
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
UnknownTopicRetries(-1),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(), StringRecord("foo"), StringRecord("bar")).FirstErr(); err != nil {
t.Fatal(err)
}
cl.Close()
}

{
cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 1}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() != 1 {
t.Errorf("saw %d records, expected 1", fs.NumRecords())
return
}
if string(fs.Records()[0].Value) != "bar" {
t.Errorf("incorrect record consumed")
return
}
cl.Close()
}

// Duplicate above, but with a group.
{
g1, gcleanup := tmpGroup(t)
defer gcleanup()

cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
ConsumerGroup(g1),
WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 1}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() != 1 {
t.Errorf("saw %d records, expected 1", fs.NumRecords())
return
}
if string(fs.Records()[0].Value) != "bar" {
t.Errorf("incorrect record consumed")
return
}
cl.Close()
}
}

func TestIssue648(t *testing.T) {
t.Parallel()
cl, _ := newTestClient(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}
tpsConsumerLoad, _ = tpsConsumer.ensureTopics(allTopics)
tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

// For regex consuming, if a topic is not returned in the
Expand Down
15 changes: 4 additions & 11 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,15 @@ func (m *mtmps) add(t string, p int32) {
mps[p] = struct{}{}
}

func (m *mtmps) addt(t string) bool {
func (m *mtmps) addt(t string) {
if *m == nil {
*m = make(mtmps)
}
mps := (*m)[t]
if mps == nil {
mps = make(map[int32]struct{})
(*m)[t] = mps
return true
}
return false
}

func (m mtmps) onlyt(t string) bool {
Expand Down Expand Up @@ -292,12 +290,7 @@ func (t *topicsPartitions) load() topicsPartitionsData {
return t.v.Load().(topicsPartitionsData)
}
func (t *topicsPartitions) storeData(d topicsPartitionsData) { t.v.Store(d) }
func (t *topicsPartitions) storeTopics(topics []string) bool {
v, added := t.ensureTopics(topics)
t.v.Store(v)
return added
}

func (t *topicsPartitions) storeTopics(topics []string) { t.v.Store(t.ensureTopics(topics)) }
func (t *topicsPartitions) clone() topicsPartitionsData {
current := t.load()
clone := make(map[string]*topicPartitions, len(current))
Expand All @@ -310,7 +303,7 @@ func (t *topicsPartitions) clone() topicsPartitionsData {
// Ensures that the topics exist in the returned map, but does not store the
// update. This can be used to update the data and store later, rather than
// storing immediately.
func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData, bool) {
func (t *topicsPartitions) ensureTopics(topics []string) topicsPartitionsData {
var cloned bool
current := t.load()
for _, topic := range topics {
Expand All @@ -322,7 +315,7 @@ func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData,
current[topic] = newTopicPartitions()
}
}
return current, cloned
return current
}

// Opposite of ensureTopics, this purges the input topics and *does* store.
Expand Down

0 comments on commit 405d87d

Please sign in to comment.