-
-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kgo: do not rotate the consumer session when pausing topics/partitions #601
Conversation
c1092c5
to
23b0878
Compare
Was changing this as you commented, the newest version is the least allocating and iterating, but a good bit more confusing
…On Sat, Oct 21, 2023 at 10:21, dave sinclair ***@***.***(mailto:On Sat, Oct 21, 2023 at 10:21, dave sinclair <<a href=)> wrote:
@stampy88 commented on this pull request.
---------------------------------------------------------------
In [pkg/kgo/source.go](#601 (comment)):
> @@ -344,8 +344,38 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
}
// takeBuffered drains a buffered fetch and updates offsets.
-func (s *source) takeBuffered() Fetch {
- return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
+func (s *source) takeBuffered(paused pausedTopics) Fetch {
+ var strip mtmps
what do you think about returning early if paused == nil? It remove that after of the eachOffset "loop", e.g.
if paused == nil {
return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
}
var strip mtmps
f := s.takeBufferedFn(true, func(os usedOffsets) {
os.eachOffset(func(o *cursorOffsetNext) {
if paused.has(o.from.topic, o.from.partition) {
o.from.allowUsable()
strip.add(o.from.topic, o.from.partition)
return
}
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
})
})
—
Reply to this email directly, [view it on GitHub](#601 (review)), or [unsubscribe](https://github.com/notifications/unsubscribe-auth/AAJZIJLBIRMSVB735LAZZQDYAPY7XAVCNFSM6AAAAAA6J2LWHSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMOJRGI4DMMBQGQ).
You are receiving this because you authored the thread.Message ID: ***@***.***>
|
Although returning even early like your suggestion would avoid a few conditionals in the new version
…On Sat, Oct 21, 2023 at 10:50, Travis Bischel ***@***.***(mailto:On Sat, Oct 21, 2023 at 10:50, Travis Bischel <<a href=)> wrote:
Was changing this as you commented, the newest version is the least allocating and iterating, but a good bit more confusing
On Sat, Oct 21, 2023 at 10:21, dave sinclair ***@***.***(mailto:On Sat, Oct 21, 2023 at 10:21, dave sinclair <<a href=)> wrote:
> @stampy88 commented on this pull request.
>
> ---------------------------------------------------------------
>
> In [pkg/kgo/source.go](#601 (comment)):
>
>> @@ -344,8 +344,38 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
> }
>
> // takeBuffered drains a buffered fetch and updates offsets.
> -func (s *source) takeBuffered() Fetch {
> - return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
> +func (s *source) takeBuffered(paused pausedTopics) Fetch {
> + var strip mtmps
>
> what do you think about returning early if paused == nil? It remove that after of the eachOffset "loop", e.g.
>
> if paused == nil {
> return s.takeBufferedFn(true, usedOffsets.finishUsingAllWithSet)
> }
>
> var strip mtmps
> f := s.takeBufferedFn(true, func(os usedOffsets) {
> os.eachOffset(func(o *cursorOffsetNext) {
> if paused.has(o.from.topic, o.from.partition) {
> o.from.allowUsable()
> strip.add(o.from.topic, o.from.partition)
> return
> }
> o.from.setOffset(o.cursorOffset)
> o.from.allowUsable()
> })
> })
>
> —
> Reply to this email directly, [view it on GitHub](#601 (review)), or [unsubscribe](https://github.com/notifications/unsubscribe-auth/AAJZIJLBIRMSVB735LAZZQDYAPY7XAVCNFSM6AAAAAA6J2LWHSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMOJRGI4DMMBQGQ).
> You are receiving this because you authored the thread.Message ID: ***@***.***>
|
var strip map[string]map[int32]struct{} | ||
f := s.takeBufferedFn(true, func(os usedOffsets) { | ||
for t, ps := range os { | ||
// If the entire topic is paused, we allowUsable all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you checked whether the topic was paused some way at the top here, you could get rid of some of the conditionals below. For example (may want to add a method or two to pausedPartitions
if you didn't want to access all
or m
) :
pp, ok := pausedTopics[t]
if !ok {
continue
}
// you know something is paused at this point
if strip == nil {
strip = make(map[string]map[int32]struct{})
}
if pp.all {
for _, o := range ps {
o.from.allowUsable()
}
strip[t] = nil // initialize key, for existence-but-len-0 check below
continue
}
// you know that specific partitions are paused at this point
stript := make(map[int32]struct{})
for _, o := range ps {
if _, ok := pp.m[o.from.partition]; ok {
o.from.allowUsable()
stript[o.from.partition] = struct{}{}
continue
}
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
strip[t] = stript
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not entirely -- it's possible that you paused "foo", but this fetch contains only data for "bar". This block of checking/stripping runs on every fetch response until you unpause everything, but once you pause something, what you paused is also not added to fetch requests. This stripping is only necessary basically for what was in flight at the time you added something to be paused.
edit: Rereading, I think you said something slightly differently from what I read it as.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this logic is better, changing.
Issue #489 asked to stop returning data after a partition was paused -- the original implementation of pausing kept returning any data that was in flight or already buffered, and simply stopped fetching new data. 489 was dealt with by bumping the consumer session, which kills all in flight fetch requests. This was easy, but can cause a lot of connection churn if pausing and resuming a lot -- which is #585. The new implementation allows fetches to complete, but strips data from fetches based on what is paused at the moment the fetches are being returned to the client. This does make polling paused fetches very slightly slower (a map lookup per partition), but there's only so much that's possible. If a partition is paused, we drop the data and do not advance the internal offset. If a partition is not paused, we keep the data and return it -- same as before.
Issue #489 asked to stop returning data after a partition was paused -- the original implementation of pausing kept returning any data that was in flight or already buffered, and simply stopped fetching new data.
489 was dealt with by bumping the consumer session, which kills all in flight fetch requests. This was easy, but can cause a lot of connection churn if pausing and resuming a lot -- which is #585.
The new implementation allows fetches to complete, but strips data from fetches based on what is paused at the moment the fetches are being returned to the client. This does make polling paused fetches very slightly slower (a map lookup per partition), but there's only so much that's possible. If a partition is paused, we drop the data and do not advance the internal offset. If a partition is not paused, we keep the data and return it -- same as before.
Closes #585.