diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 05167c72..fb622f41 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -29,6 +29,8 @@ type ( controller *broker bs []*broker + coordinatorGen atomic.Uint64 + adminCh chan func() reqCh chan *clientReq wakeCh chan *slept @@ -64,6 +66,7 @@ type ( key int16 fn controlFn keep bool + drop bool lastReq map[*clientConn]*clientReq // used to not re-run requests that slept, see doc comments below } @@ -254,6 +257,7 @@ func (b *broker) listen() { } func (c *Cluster) run() { +outer: for { var ( creq *clientReq @@ -270,11 +274,13 @@ func (c *Cluster) run() { return case admin := <-c.adminCh: - // Run a custom request in the context of the cluster. admin() continue case creq = <-c.reqCh: + if c.cfg.sleepOutOfOrder { + break + } // If we have any sleeping request on this node, // we enqueue the new live request to the end and // wait for the sleeping request to finish. @@ -301,19 +307,28 @@ func (c *Cluster) run() { // Control flow is weird here, but is described more // fully in the finish/resleep/etc methods. c.continueSleptControl(s) - select { - case res := <-s.res: - c.finishSleptControl(s) - cctx := s.cctx - s = nil - kresp, err, handled = res.kresp, res.err, res.handled - if handled { - c.popControl(cctx) - goto afterControl + inner: + for { + select { + case <-c.die: + return + case admin := <-c.adminCh: + admin() + continue inner + case res := <-s.res: + c.finishSleptControl(s) + cctx := s.cctx + s = nil + kresp, err, handled = res.kresp, res.err, res.handled + c.maybePopControl(handled, cctx) + if handled { + goto afterControl + } + break inner + case sleepChs := <-c.controlSleep: + c.resleepSleptControl(s, sleepChs) + continue outer } - case sleepChs := <-c.controlSleep: - c.resleepSleptControl(s, sleepChs) - continue } case w = <-c.watchFetchCh: @@ -435,10 +450,15 @@ func (c *Cluster) run() { // Controlling a request drops the control function from the cluster, meaning // that a control function can only control *one* request. To keep the control // function handling more requests, you can call KeepControl within your -// control function. +// control function. Alternatively, if you want to just run some logic in your +// control function but then have the cluster handle the request as normal, +// you can call DropControl to drop a control function that was not handled. +// +// It is safe to add new control functions within a control function. // -// It is safe to add new control functions within a control function. Control -// functions are not called concurrently. +// Control functions are run serially unless you use SleepControl, multiple +// control functions are "in progress", and you run Cluster.Close. Closing a +// Cluster awakens all sleeping control functions. func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.ControlKey(-1, fn) } @@ -455,9 +475,15 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { // Controlling a request drops the control function from the cluster, meaning // that a control function can only control *one* request. To keep the control // function handling more requests, you can call KeepControl within your -// control function. +// control function. Alternatively, if you want to just run some logic in your +// control function but then have the cluster handle the request as normal, +// you can call DropControl to drop a control function that was not handled. // // It is safe to add new control functions within a control function. +// +// Control functions are run serially unless you use SleepControl, multiple +// control functions are "in progress", and you run Cluster.Close. Closing a +// Cluster awakens all sleeping control functions. func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.controlMu.Lock() defer c.controlMu.Unlock() @@ -484,6 +510,19 @@ func (c *Cluster) KeepControl() { } } +// DropControl allows you to drop the current control function. This takes +// precedence over KeepControl. The use of this function is you can run custom +// control logic *once*, drop the control function, and return that the +// function was not handled -- thus allowing other control functions to run, or +// allowing the kfake cluster to process the request as normal. +func (c *Cluster) DropControl() { + c.controlMu.Lock() + defer c.controlMu.Unlock() + if c.currentControl != nil { + c.currentControl.drop = true + } +} + // SleepControl sleeps the current control function until wakeup returns. This // yields to run any other connection. // @@ -525,7 +564,10 @@ func (c *Cluster) SleepControl(wakeup func()) { }() c.controlSleep <- sleepChs - <-sleepChs.clientCont + select { + case <-sleepChs.clientCont: + case <-c.die: + } } // CurrentNode is solely valid from within a control function; it returns @@ -560,20 +602,25 @@ func (c *Cluster) tryControlKey(key int16, creq *clientReq) (kmsg.Response, erro } cctx.lastReq[creq.cc] = creq res := c.runControl(cctx, creq) - select { - case res := <-res: - if res.handled { - c.popControl(cctx) - return res.kresp, res.err, true + for { + select { + case <-c.die: + return nil, nil, false + case admin := <-c.adminCh: + admin() + continue + case res := <-res: + c.maybePopControl(res.handled, cctx) + return res.kresp, res.err, res.handled + case sleepChs := <-c.controlSleep: + c.beginSleptControl(&slept{ + cctx: cctx, + sleepChs: sleepChs, + res: res, + creq: creq, + }) + return nil, nil, true } - case sleepChs := <-c.controlSleep: - c.beginSleptControl(&slept{ - cctx: cctx, - sleepChs: sleepChs, - res: res, - creq: creq, - }) - return nil, nil, true } } return nil, nil, false @@ -606,7 +653,11 @@ func (c *Cluster) beginSleptControl(s *slept) { // unlock us safely. bs := c.sleeping[s.creq.cc] if bs == nil { - bs = &bsleep{c: c} + bs = &bsleep{ + c: c, + set: make(map[*slept]struct{}), + setWake: make(chan *slept, 1), + } c.sleeping[s.creq.cc] = bs } bs.enqueue(s) @@ -648,10 +699,13 @@ func (c *Cluster) resleepSleptControl(s *slept, sleepChs sleepChs) { c.controlMu.Unlock() s.sleepChs = sleepChs s.continueDequeue <- struct{}{} + // For OOO requests, we need to manually trigger a goroutine to + // watch for the sleep to end. + s.bs.maybeWaitOOOWake(s) } -func (c *Cluster) popControl(cctx *controlCtx) { - if !cctx.keep { +func (c *Cluster) maybePopControl(handled bool, cctx *controlCtx) { + if handled && !cctx.keep || cctx.drop { delete(c.control[cctx.key], cctx) } } @@ -659,12 +713,15 @@ func (c *Cluster) popControl(cctx *controlCtx) { // bsleep manages sleeping requests on a connection to a broker, or // non-sleeping requests that are waiting for sleeping requests to finish. type bsleep struct { - c *Cluster - mu sync.Mutex - queue []*slept + c *Cluster + mu sync.Mutex + queue []*slept + set map[*slept]struct{} + setWake chan *slept } type slept struct { + bs *bsleep cctx *controlCtx sleepChs sleepChs res <-chan controlResp @@ -681,45 +738,60 @@ type sleepChs struct { // enqueue has a few potential behaviors. // -// * If s is waiting, this is a new request enqueueing to the back of an +// (1) If s is waiting, this is a new request enqueueing to the back of an // existing queue, where we are waiting for the head request to finish // sleeping. Easy case. // -// * If s is not waiting, this is a sleeping request. If the queue is empty, +// (2) If s is not waiting, this is a sleeping request. If the queue is empty, // this is the first sleeping request on a node. We enqueue and start our wait // goroutine. Easy. // -// * If s is not waiting, but our queue is non-empty, this must be from a -// convoluted scenario: There was a request in front of us that slept, and we -// were waiting, and now we ourselves slept OR we previously slept, but we -// returned "not handled". We are now re-enqueueing ourself. Rather than add to -// the back, we update our head request with the new enqueued values. In this -// last case, bsleep is actually waiting for a signal down 'continueDequeue', -// and it will be signaled in the 'run' goroutine once tryControl returns -// (which it will, right after we are done here). We need to update values on -// the head. +// (3) If s is not waiting, but our queue is non-empty, this must be from a +// convoluted scenario: +// +// (a) the user has SleepOutOfOrder configured, +// (b) or, there was a request in front of us that slept, we were waiting, +// and now we ourselves are sleeping +// (c) or, we are sleeping for the second time in a single control func (bs *bsleep) enqueue(s *slept) bool { if bs == nil { - return false + return false // Do not enqueue, nothing sleeping } + s.continueDequeue = make(chan struct{}, 1) + s.bs = bs bs.mu.Lock() defer bs.mu.Unlock() if s.waiting { - if len(bs.queue) > 0 { - bs.queue = append(bs.queue, s) + if bs.c.cfg.sleepOutOfOrder { + panic("enqueueing a waiting request even though we are sleeping out of order") + } + if !bs.empty() { + bs.keep(s) // Case (1) return true } - return false + return false // We do not enqueue, do not wait: nothing sleeping ahead of us } - if len(bs.queue) == 0 { - bs.queue = append(bs.queue, s) - go bs.wait() + if bs.empty() { + bs.keep(s) + go bs.wait() // Case (2) + return true + } + var q0 *slept + if !bs.c.cfg.sleepOutOfOrder { + q0 = bs.queue[0] // Case (3b) or (3c) -- just update values below + } else { + // Case (3a), out of order sleep: we need to check the entire + // queue to see if this request was already sleeping and, if + // so, update the values. If it was not already sleeping, we + // "keep" the new sleeping item. + bs.keep(s) return true } - q0 := bs.queue[0] if q0.creq != s.creq { panic("internal error: sleeping request not head request") } + // We do not update continueDequeue because it is actively being read, + // we just reuse the old value. q0.cctx = s.cctx q0.sleepChs = s.sleepChs q0.res = s.res @@ -727,23 +799,111 @@ func (bs *bsleep) enqueue(s *slept) bool { return true } +// keep stores a sleeping request to be managed. For out of order control, the +// log is a bit more complicated and we need to watch for the control sleep +// finishing here, and forward the "I'm done sleeping" notification to waitSet. +func (bs *bsleep) keep(s *slept) { + if !bs.c.cfg.sleepOutOfOrder { + bs.queue = append(bs.queue, s) + return + } + bs.set[s] = struct{}{} + bs.maybeWaitOOOWake(s) +} + +func (bs *bsleep) maybeWaitOOOWake(s *slept) { + if !bs.c.cfg.sleepOutOfOrder { + return + } + go func() { + select { + case <-bs.c.die: + case <-s.sleepChs.clientWait: + select { + case <-bs.c.die: + case bs.setWake <- s: + } + } + }() +} + +func (bs *bsleep) empty() bool { + return len(bs.queue) == 0 && len(bs.set) == 0 +} + func (bs *bsleep) wait() { + if bs.c.cfg.sleepOutOfOrder { + bs.waitSet() + } else { + bs.waitQueue() + } +} + +// For out of order control, all control functions run concurrently, serially. +// Whenever they wake up, they send themselves down setWake. waitSet manages +// handling the wake up and interacting with the serial manage goroutine to +// run everything properly. +func (bs *bsleep) waitSet() { for { bs.mu.Lock() - if len(bs.queue) == 0 { + if len(bs.set) == 0 { bs.mu.Unlock() return } - q0 := bs.queue[0] bs.mu.Unlock() - if q0.continueDequeue == nil { - q0.continueDequeue = make(chan struct{}, 1) + // Wait for a control function to awaken. + var q *slept + select { + case <-bs.c.die: + return + case q = <-bs.setWake: + q.sleepChs.clientWait = nil } + // Now, schedule ourselves with the run loop. + select { + case <-bs.c.die: + return + case bs.c.wakeCh <- q: + } + + // Wait for this control function to finish its loop in the run + // function. Once it does, if clientWait is non-nil, the + // control function went back to sleep. If it is nil, the + // control function is done and we remove this from tracking. + select { + case <-bs.c.die: + return + case <-q.continueDequeue: + } + if q.sleepChs.clientWait == nil { + bs.mu.Lock() + delete(bs.set, q) + bs.mu.Unlock() + } + } +} + +// For in-order control functions, the concept is slightly simpler but the +// logic flow is the same. We wait for the head control function to wake up, +// try to run it, and then wait for it to finish. The logic of this function is +// the same as waitSet, minus the middle part where we wait for something to +// wake up. +func (bs *bsleep) waitQueue() { + for { + bs.mu.Lock() + if len(bs.queue) == 0 { + bs.mu.Unlock() + return + } + q0 := bs.queue[0] + bs.mu.Unlock() + if q0.sleepChs.clientWait != nil { select { case <-bs.c.die: + return case <-q0.sleepChs.clientWait: q0.sleepChs.clientWait = nil } @@ -755,7 +915,11 @@ func (bs *bsleep) wait() { case bs.c.wakeCh <- q0: } - <-q0.continueDequeue + select { + case <-bs.c.die: + return + case <-q0.continueDequeue: + } if q0.sleepChs.clientWait == nil { bs.mu.Lock() bs.queue = bs.queue[1:] @@ -802,6 +966,28 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32 return err } +// CoordinatorFor returns the node ID of the group or transaction coordinator +// for the given key. +func (c *Cluster) CoordinatorFor(key string) int32 { + var n int32 + c.admin(func() { + l := len(c.bs) + if l == 0 { + n = -1 + return + } + n = c.coordinator(key).node + }) + return n +} + +// RehashCoordinators simulates group and transacational ID coordinators moving +// around. All group and transactional IDs are rekeyed. This forces clients to +// reload coordinators. +func (c *Cluster) RehashCoordinators() { + c.coordinatorGen.Add(1) +} + // AddNode adds a node to the cluster. If nodeID is -1, the next node ID is // used. If port is 0 or negative, a random port is chosen. This returns the // added node ID and the port used, or an error if the node already exists or diff --git a/pkg/kfake/config.go b/pkg/kfake/config.go index d0f01106..75b34fb2 100644 --- a/pkg/kfake/config.go +++ b/pkg/kfake/config.go @@ -34,6 +34,8 @@ type cfg struct { enableSASL bool sasls map[struct{ m, u string }]string // cleared after client initialization tls *tls.Config + + sleepOutOfOrder bool } // NumBrokers sets the number of brokers to start in the fake cluster. @@ -113,3 +115,12 @@ func TLS(c *tls.Config) Opt { func SeedTopics(partitions int32, ts ...string) Opt { return opt{func(cfg *cfg) { cfg.seedTopics = append(cfg.seedTopics, seedTopics{partitions, ts}) }} } + +// SleepOutOfOrder allows functions to be handled out of order when control +// functions are sleeping. The functions are be handled internally out of +// order, but responses still wait for the sleeping requests to finish. This +// can be used to set up complicated chains of control where functions only +// advance when you know another request is actively being handled. +func SleepOutOfOrder() Opt { + return opt{func(cfg *cfg) { cfg.sleepOutOfOrder = true }} +} diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index 853c3d68..a0e5a98e 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -101,7 +101,8 @@ func (gs groupState) String() string { } func (c *Cluster) coordinator(id string) *broker { - n := hashString(id) % uint64(len(c.bs)) + gen := c.coordinatorGen.Load() + n := hashString(fmt.Sprint("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs)) return c.bs[n] }