Skip to content

Commit

Permalink
Merge pull request #665 from twmb/patches-setoffsets-revert
Browse files Browse the repository at this point in the history
SetOffsets patches revert
  • Loading branch information
twmb authored Jan 21, 2024
2 parents a8e33ff + 405d87d commit 7584d23
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 104 deletions.
26 changes: 10 additions & 16 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,19 +636,22 @@ func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) {
}

// SetOffsets sets any matching offsets in setOffsets to the given
// epoch/offset. Partitions that are not specified are not set.
// epoch/offset. Partitions that are not specified are not set. It is invalid
// to set topics that were not yet returned from a PollFetches: this function
// sets only partitions that were previously consumed, any extra partitions are
// skipped.
//
// If directly consuming, this function operates as expected given the caveats
// of the prior paragraph.
//
// If using transactions, it is advised to just use a GroupTransactSession and
// avoid this function entirely.
//
// If using group consuming, it is strongly recommended to use this function
// If using group consuming, It is strongly recommended to use this function
// outside of the context of a PollFetches loop and only when you know the
// group is not revoked (i.e., block any concurrent revoke while issuing this
// call) and to not use this concurrent with committing. Any other usage is
// prone to odd interactions around rebalancing.
// prone to odd interactions.
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
cl.setOffsets(setOffsets, true)
}
Expand All @@ -658,12 +661,6 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
return
}

topics := make([]string, 0, len(setOffsets))
for topic := range setOffsets {
topics = append(topics, topic)
}
cl.AddConsumeTopics(topics...)

// We assignPartitions before returning, so we grab the consumer lock
// first to preserve consumer mu => group mu ordering, or to ensure
// no concurrent metadata assign for direct consuming.
Expand Down Expand Up @@ -750,18 +747,15 @@ func (cl *Client) AddConsumeTopics(topics ...string) {
c.mu.Lock()
defer c.mu.Unlock()

var added bool
if c.g != nil {
added = c.g.tps.storeTopics(topics)
c.g.tps.storeTopics(topics)
} else {
added = c.d.tps.storeTopics(topics)
c.d.tps.storeTopics(topics)
for _, topic := range topics {
added = c.d.m.addt(topic) || added
c.d.m.addt(topic)
}
}
if added {
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
}

// AddConsumePartitions adds new partitions to be consumed at the given
Expand Down
76 changes: 0 additions & 76 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -471,81 +470,6 @@ func TestIssue523(t *testing.T) {
}
}

func TestSetOffsetsForNewTopic(t *testing.T) {
t.Parallel()
t1, tcleanup := tmpTopicPartitions(t, 1)
defer tcleanup()

{
cl, _ := newTestClient(
DefaultProduceTopic(t1),
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
UnknownTopicRetries(-1),
)
defer cl.Close()

if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil {
t.Fatal(err)
}
cl.Close()
}

{
cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 0}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() == 0 {
t.Errorf("failed waiting for record")
return
}
cl.Close()
}

// Duplicate above, but with a group.
{
g1, gcleanup := tmpGroup(t)
defer gcleanup()

cl, _ := newTestClient(
MetadataMinAge(100*time.Millisecond),
FetchMaxWait(time.Second),
ConsumerGroup(g1),
WithLogger(BasicLogger(os.Stderr, LogLevelDebug, nil)),
)
defer cl.Close()

cl.SetOffsets(map[string]map[int32]EpochOffset{
t1: {0: EpochOffset{Epoch: -1, Offset: 0}},
})
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
fs := cl.PollFetches(ctx)
cancel()
if errors.Is(fs.Err0(), context.DeadlineExceeded) {
t.Errorf("failed waiting for record")
return
}
if fs.NumRecords() == 0 {
t.Errorf("failed waiting for record")
return
}
cl.Close()
}
}

func TestIssue648(t *testing.T) {
t.Parallel()
cl, _ := newTestClient(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}
tpsConsumerLoad, _ = tpsConsumer.ensureTopics(allTopics)
tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

// For regex consuming, if a topic is not returned in the
Expand Down
15 changes: 4 additions & 11 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,15 @@ func (m *mtmps) add(t string, p int32) {
mps[p] = struct{}{}
}

func (m *mtmps) addt(t string) bool {
func (m *mtmps) addt(t string) {
if *m == nil {
*m = make(mtmps)
}
mps := (*m)[t]
if mps == nil {
mps = make(map[int32]struct{})
(*m)[t] = mps
return true
}
return false
}

func (m mtmps) onlyt(t string) bool {
Expand Down Expand Up @@ -292,12 +290,7 @@ func (t *topicsPartitions) load() topicsPartitionsData {
return t.v.Load().(topicsPartitionsData)
}
func (t *topicsPartitions) storeData(d topicsPartitionsData) { t.v.Store(d) }
func (t *topicsPartitions) storeTopics(topics []string) bool {
v, added := t.ensureTopics(topics)
t.v.Store(v)
return added
}

func (t *topicsPartitions) storeTopics(topics []string) { t.v.Store(t.ensureTopics(topics)) }
func (t *topicsPartitions) clone() topicsPartitionsData {
current := t.load()
clone := make(map[string]*topicPartitions, len(current))
Expand All @@ -310,7 +303,7 @@ func (t *topicsPartitions) clone() topicsPartitionsData {
// Ensures that the topics exist in the returned map, but does not store the
// update. This can be used to update the data and store later, rather than
// storing immediately.
func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData, bool) {
func (t *topicsPartitions) ensureTopics(topics []string) topicsPartitionsData {
var cloned bool
current := t.load()
for _, topic := range topics {
Expand All @@ -322,7 +315,7 @@ func (t *topicsPartitions) ensureTopics(topics []string) (topicsPartitionsData,
current[topic] = newTopicPartitions()
}
}
return current, cloned
return current
}

// Opposite of ensureTopics, this purges the input topics and *does* store.
Expand Down

0 comments on commit 7584d23

Please sign in to comment.