Skip to content

Commit

Permalink
Merge pull request #540 from axw/kfake-fix-keepcontrol
Browse files Browse the repository at this point in the history
pkg/kfake: fix KeepControl
  • Loading branch information
twmb authored Aug 24, 2023
2 parents 57ea8a8 + 731adb1 commit a1a2a45
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,20 +413,21 @@ func (c *Cluster) tryControl(kreq kmsg.Request, b *broker) (kresp kmsg.Response,
if len(c.control) == 0 {
return nil, nil, false
}

keyFns := c.control[kreq.Key()]
for i, fn := range keyFns {
kresp, err, handled = c.callControl(kreq.Key(), kreq, fn)
if handled {
c.control[kreq.Key()] = append(keyFns[:i], keyFns[i+1:]...)
return
}
kresp, err, handled = c.tryControlKey(kreq.Key(), kreq, b)
if !handled {
kresp, err, handled = c.tryControlKey(-1, kreq, b)
}
anyFns := c.control[-1]
for i, fn := range anyFns {
kresp, err, handled = c.callControl(-1, kreq, fn)
return kresp, err, handled
}

func (c *Cluster) tryControlKey(key int16, kreq kmsg.Request, b *broker) (kresp kmsg.Response, err error, handled bool) {
for i, fn := range c.control[key] {
kresp, err, handled = c.callControl(key, kreq, fn)
if handled {
c.control[-1] = append(anyFns[:i], anyFns[i+1:]...)
// fn may have called Control, ControlKey, or KeepControl,
// all of which will append to c.control; refresh the slice.
fns := c.control[key]
c.control[key] = append(fns[:i], fns[i+1:]...)
return
}
}
Expand Down

0 comments on commit a1a2a45

Please sign in to comment.