Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: do not rotate the consumer session when pausing topics/partitions #601

Merged
merged 1 commit into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
}()
}

paused := c.loadPaused()

// A group can grab the consumer lock then the group mu and
// assign partitions. The group mu is grabbed to update its
// uncommitted map. Assigning partitions clears sources ready
Expand All @@ -451,13 +453,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
c.sourcesReadyMu.Lock()
if maxPollRecords < 0 {
for _, ready := range c.sourcesReadyForDraining {
fetches = append(fetches, ready.takeBuffered())
fetches = append(fetches, ready.takeBuffered(paused))
}
c.sourcesReadyForDraining = nil
} else {
for len(c.sourcesReadyForDraining) > 0 && maxPollRecords > 0 {
source := c.sourcesReadyForDraining[0]
fetch, taken, drained := source.takeNBuffered(maxPollRecords)
fetch, taken, drained := source.takeNBuffered(paused, maxPollRecords)
if drained {
c.sourcesReadyForDraining = c.sourcesReadyForDraining[1:]
}
Expand Down Expand Up @@ -555,9 +557,7 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
// PauseFetchTopics sets the client to no longer fetch the given topics and
// returns all currently paused topics. Paused topics persist until resumed.
// You can call this function with no topics to simply receive the list of
// currently paused topics. Pausing topics drops everything currently buffered
// and kills any in flight fetch requests to ensure nothing that is paused
// can be returned anymore from polling.
// currently paused topics.
//
// Pausing topics is independent from pausing individual partitions with the
// PauseFetchPartitions method. If you pause partitions for a topic with
Expand All @@ -569,15 +569,8 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
if len(topics) == 0 {
return c.loadPaused().pausedTopics()
}

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
}()

paused := c.clonePaused()
paused.addTopics(topics...)
c.storePaused(paused)
Expand All @@ -587,9 +580,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
// PauseFetchPartitions sets the client to no longer fetch the given partitions
// and returns all currently paused partitions. Paused partitions persist until
// resumed. You can call this function with no partitions to simply receive the
// list of currently paused partitions. Pausing partitions drops everything
// currently buffered and kills any in flight fetch requests to ensure nothing
// that is paused can be returned anymore from polling.
// list of currently paused partitions.
//
// Pausing individual partitions is independent from pausing topics with the
// PauseFetchTopics method. If you pause partitions for a topic with
Expand All @@ -601,15 +592,8 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
if len(topicPartitions) == 0 {
return c.loadPaused().pausedPartitions()
}

c.pausedMu.Lock()
defer c.pausedMu.Unlock()
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
}()

paused := c.clonePaused()
paused.addPartitions(topicPartitions)
c.storePaused(paused)
Expand Down Expand Up @@ -884,10 +868,6 @@ const (
// The counterpart to assignInvalidateMatching, assignSetMatching
// resets all matching partitions to the specified offset / epoch.
assignSetMatching

// For pausing, we want to drop anything inflight. We start a new
// session with the old tps.
assignBumpSession
)

func (h assignHow) String() string {
Expand All @@ -902,8 +882,6 @@ func (h assignHow) String() string {
return "unassigning and purging any partition matching the input topics"
case assignSetMatching:
return "reassigning any currently assigned matching partition to the input"
case assignBumpSession:
return "bumping internal consumer session to drop anything currently in flight"
}
return ""
}
Expand Down Expand Up @@ -984,8 +962,6 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
// if we had no session before, which is why we need to pass in
// our topicPartitions.
session = c.guardSessionChange(tps)
} else if how == assignBumpSession {
loadOffsets, tps = c.stopSession()
} else {
loadOffsets, _ = c.stopSession()

Expand Down Expand Up @@ -1032,7 +1008,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
// assignment went straight to listing / epoch loading, and
// that list/epoch never finished.
switch how {
case assignWithoutInvalidating, assignBumpSession:
case assignWithoutInvalidating:
// Nothing to do -- this is handled above.
case assignInvalidateAll:
loadOffsets = listOrEpochLoads{}
Expand Down
9 changes: 7 additions & 2 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,13 @@ func TestPauseIssue489(t *testing.T) {
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for i := 0; i < 5; i++ {
fs := cl.PollFetches(ctx)
for i := 0; i < 10; i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
}
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
Expand Down
88 changes: 83 additions & 5 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,64 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
}

// takeBuffered drains a buffered fetch and updates offsets.
func (s *source) takeBuffered() Fetch {
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
func (s *source) takeBuffered(paused pausedTopics) Fetch {
if len(paused) == 0 {
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
}
var strip map[string]map[int32]struct{}
f := s.takeBufferedFn(true, func(os usedOffsets) {
for t, ps := range os {
// If the entire topic is paused, we allowUsable all

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you checked whether the topic was paused some way at the top here, you could get rid of some of the conditionals below. For example (may want to add a method or two to pausedPartitions if you didn't want to access all or m) :

pp, ok := pausedTopics[t]
if !ok {
	continue
}

// you know something is paused at this point
if strip == nil {
	strip = make(map[string]map[int32]struct{})
}

if pp.all {
	for _, o := range ps {
		o.from.allowUsable()
	}
	strip[t] = nil // initialize key, for existence-but-len-0 check below
	continue
}

// you know that specific partitions are paused at this point
stript := make(map[int32]struct{})
for _, o := range ps {
	if _, ok := pp.m[o.from.partition]; ok {
		o.from.allowUsable()
		stript[o.from.partition] = struct{}{}
		continue
	}
	o.from.setOffset(o.cursorOffset)
	o.from.allowUsable()
}
strip[t] = stript

Copy link
Owner Author

@twmb twmb Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely -- it's possible that you paused "foo", but this fetch contains only data for "bar". This block of checking/stripping runs on every fetch response until you unpause everything, but once you pause something, what you paused is also not added to fetch requests. This stripping is only necessary basically for what was in flight at the time you added something to be paused.

edit: Rereading, I think you said something slightly differently from what I read it as.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this logic is better, changing.

// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
continue
}
if strip == nil {
strip = make(map[string]map[int32]struct{})
}
if pps.all {
for _, o := range ps {
o.from.allowUsable()
}
strip[t] = nil // initialize key, for existence-but-len-0 check below
continue
}
stript := make(map[int32]struct{})
strip[t] = stript
for _, o := range ps {
if _, ok := pps.m[o.from.partition]; ok {
o.from.allowUsable()
stript[o.from.partition] = struct{}{}
continue
}
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
}
})
if strip != nil {
keep := f.Topics[:0]
for _, t := range f.Topics {
stript, ok := strip[t.Topic]
if ok {
if len(stript) == 0 {
continue // stripping this entire topic
}
keepp := t.Partitions[:0]
for _, p := range t.Partitions {
if _, ok := stript[p.Partition]; ok {
continue
}
keepp = append(keepp, p)
}
t.Partitions = keepp
}
keep = append(keep, t)
}
f.Topics = keep
}
return f
}

func (s *source) discardBuffered() {
Expand All @@ -359,7 +415,7 @@ func (s *source) discardBuffered() {
//
// This returns the number of records taken and whether the source has been
// completely drained.
func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
var r Fetch
var taken int

Expand All @@ -368,6 +424,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
for len(bf.Topics) > 0 && n > 0 {
t := &bf.Topics[0]

// If the topic is outright paused, we allowUsable all
// partitions in the topic and skip the topic entirely.
if paused.has(t.Topic, -1) {
bf.Topics = bf.Topics[1:]
for _, pCursor := range b.usedOffsets[t.Topic] {
pCursor.from.allowUsable()
}
delete(b.usedOffsets, t.Topic)
continue
}

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
Expand All @@ -377,6 +444,17 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
for len(t.Partitions) > 0 && n > 0 {
p := &t.Partitions[0]

if paused.has(t.Topic, p.Partition) {
t.Partitions = t.Partitions[1:]
pCursor := tCursors[p.Partition]
pCursor.from.allowUsable()
delete(tCursors, p.Partition)
if len(tCursors) == 0 {
delete(b.usedOffsets, t.Topic)
}
continue
}

rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]

Expand All @@ -402,7 +480,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {
if len(tCursors) == 0 {
delete(b.usedOffsets, t.Topic)
}
break
continue
}

lastReturnedRecord := rp.Records[len(rp.Records)-1]
Expand All @@ -422,7 +500,7 @@ func (s *source) takeNBuffered(n int) (Fetch, int, bool) {

drained := len(bf.Topics) == 0
if drained {
s.takeBuffered()
s.takeBuffered(nil)
}
return r, taken, drained
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ type pausedPartitions struct {
m map[int32]struct{}
}

func (m pausedTopics) t(topic string) (pausedPartitions, bool) {
if len(m) == 0 { // potentially nil
return pausedPartitions{}, false
}
pps, exists := m[topic]
return pps, exists
}

func (m pausedTopics) has(topic string, partition int32) (paused bool) {
if len(m) == 0 {
return false
Expand Down