From 3226dac0355cd7345a7fa38f58c59537c5f4108b Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 16 Dec 2023 20:05:38 -0700 Subject: [PATCH] kfake: add SleepControl This function allows you to sleep a function you are controlling until your wakeup function returns. The control function effectively yields to other requests. Note that since requests must be handled in order, you need to be a bit careful to not block other requests (unless you intentionally do that). This basically does what it says on the tin. The behavior of everything else is unchanged -- you can KeepControl, you can return false to say it wasn't handled, etc. The logic and control flow is a good bit ugly, but it works and is fairly documented and "well contained". In working on this, I also found and fixed a bug that resulted in correllation errors when handling join&sync. kgo group tests still work against kfake's "hidden" main.go, and I have tested SleepControl with/without KeepControl, and with/without returning handled=true. --- pkg/kfake/01_fetch.go | 4 +- pkg/kfake/08_offset_commit.go | 2 +- pkg/kfake/09_offset_fetch.go | 2 +- pkg/kfake/11_join_group.go | 2 +- pkg/kfake/12_heartbeat.go | 2 +- pkg/kfake/13_leave_group.go | 2 +- pkg/kfake/14_sync_group.go | 2 +- pkg/kfake/15_describe_groups.go | 2 +- pkg/kfake/16_list_groups.go | 2 +- pkg/kfake/17_sasl_handshake.go | 2 +- pkg/kfake/36_sasl_authenticate.go | 2 +- pkg/kfake/42_delete_groups.go | 2 +- pkg/kfake/47_offset_delete.go | 2 +- pkg/kfake/client_conn.go | 9 +- pkg/kfake/cluster.go | 411 ++++++++++++++++++++++++++---- pkg/kfake/groups.go | 54 ++-- pkg/kfake/sasl.go | 2 +- 17 files changed, 412 insertions(+), 92 deletions(-) diff --git a/pkg/kfake/01_fetch.go b/pkg/kfake/01_fetch.go index 08053d1e..f0fc4f07 100644 --- a/pkg/kfake/01_fetch.go +++ b/pkg/kfake/01_fetch.go @@ -18,7 +18,7 @@ import ( func init() { regKey(1, 4, 13) } -func (c *Cluster) handleFetch(creq clientReq, w *watchFetch) (kmsg.Response, error) { +func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) { var ( req = creq.kreq.(*kmsg.FetchRequest) resp = req.ResponseKind().(*kmsg.FetchResponse) @@ -182,7 +182,7 @@ type watchFetch struct { need int needp tps[int] deadline time.Time - creq clientReq + creq *clientReq in []*partData cb func() diff --git a/pkg/kfake/08_offset_commit.go b/pkg/kfake/08_offset_commit.go index 69e9dab7..692f4cc0 100644 --- a/pkg/kfake/08_offset_commit.go +++ b/pkg/kfake/08_offset_commit.go @@ -7,7 +7,7 @@ import ( func init() { regKey(8, 0, 8) } -func (c *Cluster) handleOffsetCommit(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) diff --git a/pkg/kfake/09_offset_fetch.go b/pkg/kfake/09_offset_fetch.go index 24047d4f..204339ec 100644 --- a/pkg/kfake/09_offset_fetch.go +++ b/pkg/kfake/09_offset_fetch.go @@ -6,7 +6,7 @@ import ( func init() { regKey(9, 0, 8) } -func (c *Cluster) handleOffsetFetch(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetFetch(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetFetchRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/11_join_group.go b/pkg/kfake/11_join_group.go index ef537785..70ed1d89 100644 --- a/pkg/kfake/11_join_group.go +++ b/pkg/kfake/11_join_group.go @@ -6,7 +6,7 @@ import ( func init() { regKey(11, 0, 9) } -func (c *Cluster) handleJoinGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleJoinGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.JoinGroupRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/12_heartbeat.go b/pkg/kfake/12_heartbeat.go index b8e1a188..59f12712 100644 --- a/pkg/kfake/12_heartbeat.go +++ b/pkg/kfake/12_heartbeat.go @@ -7,7 +7,7 @@ import ( func init() { regKey(12, 0, 4) } -func (c *Cluster) handleHeartbeat(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleHeartbeat(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.HeartbeatRequest) resp := req.ResponseKind().(*kmsg.HeartbeatResponse) diff --git a/pkg/kfake/13_leave_group.go b/pkg/kfake/13_leave_group.go index 9f17ec6c..e941f19e 100644 --- a/pkg/kfake/13_leave_group.go +++ b/pkg/kfake/13_leave_group.go @@ -7,7 +7,7 @@ import ( func init() { regKey(13, 0, 5) } -func (c *Cluster) handleLeaveGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleLeaveGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.LeaveGroupRequest) resp := req.ResponseKind().(*kmsg.LeaveGroupResponse) diff --git a/pkg/kfake/14_sync_group.go b/pkg/kfake/14_sync_group.go index 07ea0538..9944b112 100644 --- a/pkg/kfake/14_sync_group.go +++ b/pkg/kfake/14_sync_group.go @@ -7,7 +7,7 @@ import ( func init() { regKey(14, 0, 5) } -func (c *Cluster) handleSyncGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSyncGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SyncGroupRequest) resp := req.ResponseKind().(*kmsg.SyncGroupResponse) diff --git a/pkg/kfake/15_describe_groups.go b/pkg/kfake/15_describe_groups.go index 1152bcf8..8791759b 100644 --- a/pkg/kfake/15_describe_groups.go +++ b/pkg/kfake/15_describe_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(15, 0, 5) } -func (c *Cluster) handleDescribeGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleDescribeGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.DescribeGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/16_list_groups.go b/pkg/kfake/16_list_groups.go index e254284b..6d0189c4 100644 --- a/pkg/kfake/16_list_groups.go +++ b/pkg/kfake/16_list_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(16, 0, 4) } -func (c *Cluster) handleListGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleListGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.ListGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/17_sasl_handshake.go b/pkg/kfake/17_sasl_handshake.go index 34028bc8..8a80cbf1 100644 --- a/pkg/kfake/17_sasl_handshake.go +++ b/pkg/kfake/17_sasl_handshake.go @@ -7,7 +7,7 @@ import ( func init() { regKey(17, 1, 1) } -func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSASLHandshake(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SASLHandshakeRequest) resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse) diff --git a/pkg/kfake/36_sasl_authenticate.go b/pkg/kfake/36_sasl_authenticate.go index ec0e2a97..b94d2f01 100644 --- a/pkg/kfake/36_sasl_authenticate.go +++ b/pkg/kfake/36_sasl_authenticate.go @@ -9,7 +9,7 @@ import ( func init() { regKey(36, 0, 2) } -func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSASLAuthenticate(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SASLAuthenticateRequest) resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse) diff --git a/pkg/kfake/42_delete_groups.go b/pkg/kfake/42_delete_groups.go index e820b94f..68257415 100644 --- a/pkg/kfake/42_delete_groups.go +++ b/pkg/kfake/42_delete_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(42, 0, 2) } -func (c *Cluster) handleDeleteGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleDeleteGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.DeleteGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/47_offset_delete.go b/pkg/kfake/47_offset_delete.go index 900b8bee..878e83b4 100644 --- a/pkg/kfake/47_offset_delete.go +++ b/pkg/kfake/47_offset_delete.go @@ -7,7 +7,7 @@ import ( func init() { regKey(47, 0, 0) } -func (c *Cluster) handleOffsetDelete(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetDelete(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetDeleteRequest) resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse) diff --git a/pkg/kfake/client_conn.go b/pkg/kfake/client_conn.go index 2d6b89f1..a78b574f 100644 --- a/pkg/kfake/client_conn.go +++ b/pkg/kfake/client_conn.go @@ -25,8 +25,8 @@ type ( cc *clientConn kreq kmsg.Request at time.Time - corr int32 cid string + corr int32 seq uint32 } @@ -38,7 +38,7 @@ type ( } ) -func (creq clientReq) empty() bool { return creq.cc == nil || creq.kreq == nil } +func (creq *clientReq) empty() bool { return creq == nil || creq.cc == nil || creq.kreq == nil } func (cc *clientConn) read() { defer cc.conn.Close() @@ -101,7 +101,7 @@ func (cc *clientConn) read() { } select { - case cc.c.reqCh <- clientReq{cc, kreq, time.Now(), corr, cid, seq}: + case cc.c.reqCh <- &clientReq{cc, kreq, time.Now(), cid, corr, seq}: seq++ case <-cc.c.die: return @@ -141,6 +141,9 @@ func (cc *clientConn) write() { case <-cc.c.die: return } + } else { + delete(oooresp, seq) + seq++ } if err := resp.err; err != nil { cc.c.cfg.logger.Logf(LogLevelInfo, "client %s request unable to be handled: %v", who, err) diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 0fc46a8d..05167c72 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -30,13 +30,16 @@ type ( bs []*broker adminCh chan func() - reqCh chan clientReq + reqCh chan *clientReq + wakeCh chan *slept watchFetchCh chan *watchFetch - controlMu sync.Mutex - control map[int16][]controlFn - keepCurrentControl atomic.Bool - currentBroker atomic.Pointer[broker] + controlMu sync.Mutex + control map[int16]map[*controlCtx]struct{} + currentBroker *broker + currentControl *controlCtx + sleeping map[*clientConn]*bsleep + controlSleep chan sleepChs data data pids pids @@ -56,6 +59,19 @@ type ( } controlFn func(kmsg.Request) (kmsg.Response, error, bool) + + controlCtx struct { + key int16 + fn controlFn + keep bool + lastReq map[*clientConn]*clientReq // used to not re-run requests that slept, see doc comments below + } + + controlResp struct { + kresp kmsg.Response + err error + handled bool + } ) // MustCluster is like NewCluster, but panics on error. @@ -91,9 +107,13 @@ func NewCluster(opts ...Opt) (*Cluster, error) { cfg: cfg, adminCh: make(chan func()), - reqCh: make(chan clientReq, 20), + reqCh: make(chan *clientReq, 20), + wakeCh: make(chan *slept, 10), watchFetchCh: make(chan *watchFetch, 20), - control: make(map[int16][]controlFn), + control: make(map[int16]map[*controlCtx]struct{}), + controlSleep: make(chan sleepChs, 1), + + sleeping: make(map[*clientConn]*bsleep), data: data{ id2t: make(map[uuid]string), @@ -235,27 +255,76 @@ func (b *broker) listen() { func (c *Cluster) run() { for { - var creq clientReq - var w *watchFetch + var ( + creq *clientReq + w *watchFetch + s *slept + kreq kmsg.Request + kresp kmsg.Response + err error + handled bool + ) select { + case <-c.die: + return + + case admin := <-c.adminCh: + // Run a custom request in the context of the cluster. + admin() + continue + case creq = <-c.reqCh: + // If we have any sleeping request on this node, + // we enqueue the new live request to the end and + // wait for the sleeping request to finish. + bs := c.sleeping[creq.cc] + if bs.enqueue(&slept{ + creq: creq, + waiting: true, + }) { + continue + } + + case s = <-c.wakeCh: + // On wakeup, we know we are handling a control + // function that was slept, or a request that was + // waiting for a control function to finish sleeping. + creq = s.creq + if s.waiting { + break + } + + // We continue a previously sleeping request, and + // handle results similar to tryControl. + // + // Control flow is weird here, but is described more + // fully in the finish/resleep/etc methods. + c.continueSleptControl(s) + select { + case res := <-s.res: + c.finishSleptControl(s) + cctx := s.cctx + s = nil + kresp, err, handled = res.kresp, res.err, res.handled + if handled { + c.popControl(cctx) + goto afterControl + } + case sleepChs := <-c.controlSleep: + c.resleepSleptControl(s, sleepChs) + continue + } + case w = <-c.watchFetchCh: if w.cleaned { continue // already cleaned up, this is an extraneous timer fire } w.cleanup(c) creq = w.creq - case <-c.die: - return - case fn := <-c.adminCh: - // Run a custom request in the context of the cluster - fn() - continue } - kreq := creq.kreq - kresp, err, handled := c.tryControl(kreq, creq.cc.b) + kresp, err, handled = c.tryControl(creq) if handled { goto afterControl } @@ -267,6 +336,7 @@ func (c *Cluster) run() { } } + kreq = creq.kreq switch k := kmsg.Key(kreq.Key()); k { case kmsg.Produce: kresp, err = c.handleProduce(creq.cc.b, kreq) @@ -331,11 +401,18 @@ func (c *Cluster) run() { case kmsg.AlterUserSCRAMCredentials: kresp, err = c.handleAlterUserSCRAMCredentials(creq.cc.b, kreq) default: - err = fmt.Errorf("unahndled key %v", k) + err = fmt.Errorf("unhandled key %v", k) } afterControl: - if kresp == nil && err == nil { // produce request with no acks, or hijacked group request + // If s is non-nil, this is either a previously slept control + // that finished but was not handled, or a previously slept + // waiting request. In either case, we need to signal to the + // sleep dequeue loop to continue. + if s != nil { + s.continueDequeue <- struct{}{} + } + if kresp == nil && err == nil { // produce request with no acks, or otherwise hijacked request (group, sleep) continue } @@ -363,9 +440,7 @@ func (c *Cluster) run() { // It is safe to add new control functions within a control function. Control // functions are not called concurrently. func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { - c.controlMu.Lock() - defer c.controlMu.Unlock() - c.control[-1] = append(c.control[-1], fn) + c.ControlKey(-1, fn) } // Control is a function to call on a specific request key that the cluster @@ -386,65 +461,307 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.controlMu.Lock() defer c.controlMu.Unlock() - c.control[key] = append(c.control[key], fn) + m := c.control[key] + if m == nil { + m = make(map[*controlCtx]struct{}) + c.control[key] = m + } + m[&controlCtx{ + key: key, + fn: fn, + lastReq: make(map[*clientConn]*clientReq), + }] = struct{}{} } // KeepControl marks the currently running control function to be kept even if // you handle the request and return true. This can be used to continuously // control requests without needing to re-add control functions manually. func (c *Cluster) KeepControl() { - c.keepCurrentControl.Swap(true) + c.controlMu.Lock() + defer c.controlMu.Unlock() + if c.currentControl != nil { + c.currentControl.keep = true + } +} + +// SleepControl sleeps the current control function until wakeup returns. This +// yields to run any other connection. +// +// Note that per protocol, requests on the same connection must be replied to +// in order. Many clients write multiple requests to the same connection, so +// if you sleep until a different request runs, you may sleep forever -- you +// must know the semantics of your client to know whether requests run on +// different connections (or, ensure you are writing to different brokers). +// +// For example, franz-go uses a dedicated connection for: +// - produce requests +// - fetch requests +// - join&sync requests +// - requests with a Timeout field +// - all other request +// +// So, for franz-go, there are up to five separate connections depending +// on what you are doing. +// +// You can run SleepControl multiple times in the same control function. If you +// sleep a request you are controlling, and another request of the same key +// comes in, it will run the same control function and may also sleep (i.e., +// you must have logic if you want to avoid sleeping on the same request). +func (c *Cluster) SleepControl(wakeup func()) { + c.controlMu.Lock() + if c.currentControl == nil { + c.controlMu.Unlock() + return + } + c.controlMu.Unlock() + + sleepChs := sleepChs{ + clientWait: make(chan struct{}, 1), + clientCont: make(chan struct{}, 1), + } + go func() { + wakeup() + sleepChs.clientWait <- struct{}{} + }() + + c.controlSleep <- sleepChs + <-sleepChs.clientCont } // CurrentNode is solely valid from within a control function; it returns // the broker id that the request was received by. // If there's no request currently inflight, this returns -1. func (c *Cluster) CurrentNode() int32 { - if b := c.currentBroker.Load(); b != nil { + c.controlMu.Lock() + defer c.controlMu.Unlock() + if b := c.currentBroker; b != nil { return b.node } return -1 } -func (c *Cluster) tryControl(kreq kmsg.Request, b *broker) (kresp kmsg.Response, err error, handled bool) { - c.currentBroker.Store(b) - defer c.currentBroker.Store(nil) +func (c *Cluster) tryControl(creq *clientReq) (kresp kmsg.Response, err error, handled bool) { c.controlMu.Lock() defer c.controlMu.Unlock() if len(c.control) == 0 { return nil, nil, false } - kresp, err, handled = c.tryControlKey(kreq.Key(), kreq, b) + kresp, err, handled = c.tryControlKey(creq.kreq.Key(), creq) if !handled { - kresp, err, handled = c.tryControlKey(-1, kreq, b) + kresp, err, handled = c.tryControlKey(-1, creq) } return kresp, err, handled } -func (c *Cluster) tryControlKey(key int16, kreq kmsg.Request, b *broker) (kresp kmsg.Response, err error, handled bool) { - for i, fn := range c.control[key] { - kresp, err, handled = c.callControl(key, kreq, fn) - if handled { - // fn may have called Control, ControlKey, or KeepControl, - // all of which will append to c.control; refresh the slice. - fns := c.control[key] - c.control[key] = append(fns[:i], fns[i+1:]...) - return +func (c *Cluster) tryControlKey(key int16, creq *clientReq) (kmsg.Response, error, bool) { + for cctx := range c.control[key] { + if cctx.lastReq[creq.cc] == creq { + continue + } + cctx.lastReq[creq.cc] = creq + res := c.runControl(cctx, creq) + select { + case res := <-res: + if res.handled { + c.popControl(cctx) + return res.kresp, res.err, true + } + case sleepChs := <-c.controlSleep: + c.beginSleptControl(&slept{ + cctx: cctx, + sleepChs: sleepChs, + res: res, + creq: creq, + }) + return nil, nil, true } } - return + return nil, nil, false } -func (c *Cluster) callControl(key int16, req kmsg.Request, fn controlFn) (kresp kmsg.Response, err error, handled bool) { - c.keepCurrentControl.Swap(false) +func (c *Cluster) runControl(cctx *controlCtx, creq *clientReq) chan controlResp { + res := make(chan controlResp, 1) + c.currentBroker = creq.cc.b + c.currentControl = cctx + // We unlock before entering a control function so that the control + // function can modify / add more control. We re-lock when exiting the + // control function. This does pose some weird control flow issues + // w.r.t. sleeping requests. Here, we have to re-lock before sending + // down res, otherwise we risk unlocking an unlocked mu in + // finishSleepControl. c.controlMu.Unlock() - defer func() { + go func() { + kresp, err, handled := cctx.fn(creq.kreq) c.controlMu.Lock() - if handled && c.keepCurrentControl.Swap(false) { - c.control[key] = append(c.control[key], fn) - } + c.currentControl = nil + c.currentBroker = nil + res <- controlResp{kresp, err, handled} }() - return fn(req) + return res +} + +func (c *Cluster) beginSleptControl(s *slept) { + // Control flow gets really weird here. We unlocked when entering the + // control function, so we have to re-lock now so that tryControl can + // unlock us safely. + bs := c.sleeping[s.creq.cc] + if bs == nil { + bs = &bsleep{c: c} + c.sleeping[s.creq.cc] = bs + } + bs.enqueue(s) + c.controlMu.Lock() + c.currentControl = nil + c.currentBroker = nil +} + +func (c *Cluster) continueSleptControl(s *slept) { + // When continuing a slept control, we are in the main run loop and are + // not currently under the control mu. We need to re-set the current + // broker and current control before resuming. + c.controlMu.Lock() + c.currentBroker = s.creq.cc.b + c.currentControl = s.cctx + c.controlMu.Unlock() + s.sleepChs.clientCont <- struct{}{} +} + +func (c *Cluster) finishSleptControl(s *slept) { + // When finishing a slept control, the control function exited and + // grabbed the control mu. We clear the control, unlock, and allow the + // slept control to be dequeued. + c.currentControl = nil + c.currentBroker = nil + c.controlMu.Unlock() + s.continueDequeue <- struct{}{} +} + +func (c *Cluster) resleepSleptControl(s *slept, sleepChs sleepChs) { + // A control function previously slept and is now again sleeping. We + // need to clear the control broker / etc, update the sleep channels, + // and allow the sleep dequeueing to continue. The control function + // will not be deqeueued in the loop because we updated sleepChs with + // a non-nil clientWait. + c.controlMu.Lock() + c.currentBroker = nil + c.currentControl = nil + c.controlMu.Unlock() + s.sleepChs = sleepChs + s.continueDequeue <- struct{}{} +} + +func (c *Cluster) popControl(cctx *controlCtx) { + if !cctx.keep { + delete(c.control[cctx.key], cctx) + } +} + +// bsleep manages sleeping requests on a connection to a broker, or +// non-sleeping requests that are waiting for sleeping requests to finish. +type bsleep struct { + c *Cluster + mu sync.Mutex + queue []*slept +} + +type slept struct { + cctx *controlCtx + sleepChs sleepChs + res <-chan controlResp + creq *clientReq + waiting bool + + continueDequeue chan struct{} +} + +type sleepChs struct { + clientWait chan struct{} + clientCont chan struct{} +} + +// enqueue has a few potential behaviors. +// +// * If s is waiting, this is a new request enqueueing to the back of an +// existing queue, where we are waiting for the head request to finish +// sleeping. Easy case. +// +// * If s is not waiting, this is a sleeping request. If the queue is empty, +// this is the first sleeping request on a node. We enqueue and start our wait +// goroutine. Easy. +// +// * If s is not waiting, but our queue is non-empty, this must be from a +// convoluted scenario: There was a request in front of us that slept, and we +// were waiting, and now we ourselves slept OR we previously slept, but we +// returned "not handled". We are now re-enqueueing ourself. Rather than add to +// the back, we update our head request with the new enqueued values. In this +// last case, bsleep is actually waiting for a signal down 'continueDequeue', +// and it will be signaled in the 'run' goroutine once tryControl returns +// (which it will, right after we are done here). We need to update values on +// the head. +func (bs *bsleep) enqueue(s *slept) bool { + if bs == nil { + return false + } + bs.mu.Lock() + defer bs.mu.Unlock() + if s.waiting { + if len(bs.queue) > 0 { + bs.queue = append(bs.queue, s) + return true + } + return false + } + if len(bs.queue) == 0 { + bs.queue = append(bs.queue, s) + go bs.wait() + return true + } + q0 := bs.queue[0] + if q0.creq != s.creq { + panic("internal error: sleeping request not head request") + } + q0.cctx = s.cctx + q0.sleepChs = s.sleepChs + q0.res = s.res + q0.waiting = s.waiting + return true +} + +func (bs *bsleep) wait() { + for { + bs.mu.Lock() + if len(bs.queue) == 0 { + bs.mu.Unlock() + return + } + q0 := bs.queue[0] + bs.mu.Unlock() + + if q0.continueDequeue == nil { + q0.continueDequeue = make(chan struct{}, 1) + } + + if q0.sleepChs.clientWait != nil { + select { + case <-bs.c.die: + case <-q0.sleepChs.clientWait: + q0.sleepChs.clientWait = nil + } + } + + select { + case <-bs.c.die: + return + case bs.c.wakeCh <- q0: + } + + <-q0.continueDequeue + if q0.sleepChs.clientWait == nil { + bs.mu.Lock() + bs.queue = bs.queue[1:] + bs.mu.Unlock() + } + } } // Various administrative requests can be passed into the cluster to simulate diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index a0a70b11..853c3d68 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -38,7 +38,7 @@ type ( protocols map[string]int protocol string - reqCh chan clientReq + reqCh chan *clientReq controlCh chan func() nJoining int @@ -58,7 +58,7 @@ type ( // waitingReply is non-nil if a client is waiting for a reply // from us for a JoinGroupRequest or a SyncGroupRequest. - waitingReply clientReq + waitingReply *clientReq assignment []byte @@ -105,7 +105,7 @@ func (c *Cluster) coordinator(id string) *broker { return c.bs[n] } -func (c *Cluster) validateGroup(creq clientReq, group string) *kerr.Error { +func (c *Cluster) validateGroup(creq *clientReq, group string) *kerr.Error { switch key := kmsg.Key(creq.kreq.Key()); key { case kmsg.OffsetCommit, kmsg.OffsetFetch, kmsg.DescribeGroups, kmsg.DeleteGroups: default: @@ -132,7 +132,7 @@ func generateMemberID(clientID string, instanceID *string) string { //////////// // handleJoin completely hijacks the incoming request. -func (gs *groups) handleJoin(creq clientReq) { +func (gs *groups) handleJoin(creq *clientReq) { if gs.gs == nil { gs.gs = make(map[string]*group) } @@ -147,7 +147,7 @@ start: members: make(map[string]*groupMember), pending: make(map[string]*groupMember), protocols: make(map[string]int), - reqCh: make(chan clientReq), + reqCh: make(chan *clientReq), controlCh: make(chan func()), quitCh: make(chan struct{}), } @@ -165,7 +165,7 @@ start: // Returns true if the request is hijacked and handled, otherwise false if the // group does not exist. -func (gs *groups) handleHijack(group string, creq clientReq) bool { +func (gs *groups) handleHijack(group string, creq *clientReq) bool { if gs.gs == nil { return false } @@ -181,27 +181,27 @@ func (gs *groups) handleHijack(group string, creq clientReq) bool { } } -func (gs *groups) handleSync(creq clientReq) bool { +func (gs *groups) handleSync(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.SyncGroupRequest).Group, creq) } -func (gs *groups) handleHeartbeat(creq clientReq) bool { +func (gs *groups) handleHeartbeat(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.HeartbeatRequest).Group, creq) } -func (gs *groups) handleLeave(creq clientReq) bool { +func (gs *groups) handleLeave(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq) } -func (gs *groups) handleOffsetCommit(creq clientReq) bool { +func (gs *groups) handleOffsetCommit(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq) } -func (gs *groups) handleOffsetDelete(creq clientReq) bool { +func (gs *groups) handleOffsetDelete(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.OffsetDeleteRequest).Group, creq) } -func (gs *groups) handleList(creq clientReq) *kmsg.ListGroupsResponse { +func (gs *groups) handleList(creq *clientReq) *kmsg.ListGroupsResponse { req := creq.kreq.(*kmsg.ListGroupsRequest) resp := req.ResponseKind().(*kmsg.ListGroupsResponse) @@ -233,7 +233,7 @@ func (gs *groups) handleList(creq clientReq) *kmsg.ListGroupsResponse { return resp } -func (gs *groups) handleDescribe(creq clientReq) *kmsg.DescribeGroupsResponse { +func (gs *groups) handleDescribe(creq *clientReq) *kmsg.DescribeGroupsResponse { req := creq.kreq.(*kmsg.DescribeGroupsRequest) resp := req.ResponseKind().(*kmsg.DescribeGroupsResponse) @@ -285,7 +285,7 @@ func (gs *groups) handleDescribe(creq clientReq) *kmsg.DescribeGroupsResponse { return resp } -func (gs *groups) handleDelete(creq clientReq) *kmsg.DeleteGroupsResponse { +func (gs *groups) handleDelete(creq *clientReq) *kmsg.DeleteGroupsResponse { req := creq.kreq.(*kmsg.DeleteGroupsRequest) resp := req.ResponseKind().(*kmsg.DeleteGroupsResponse) @@ -324,7 +324,7 @@ func (gs *groups) handleDelete(creq clientReq) *kmsg.DeleteGroupsResponse { return resp } -func (gs *groups) handleOffsetFetch(creq clientReq) *kmsg.OffsetFetchResponse { +func (gs *groups) handleOffsetFetch(creq *clientReq) *kmsg.OffsetFetchResponse { req := creq.kreq.(*kmsg.OffsetFetchRequest) resp := req.ResponseKind().(*kmsg.OffsetFetchResponse) @@ -423,7 +423,7 @@ func (gs *groups) handleOffsetFetch(creq clientReq) *kmsg.OffsetFetchResponse { return resp } -func (g *group) handleOffsetDelete(creq clientReq) *kmsg.OffsetDeleteResponse { +func (g *group) handleOffsetDelete(creq *clientReq) *kmsg.OffsetDeleteResponse { req := creq.kreq.(*kmsg.OffsetDeleteRequest) resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse) @@ -588,7 +588,7 @@ func (g *group) quitOnce() { // to the client to immediately rejoin if a new client enters the group. // // If this returns nil, the request will be replied to later. -func (g *group) handleJoin(creq clientReq) (kmsg.Response, bool) { +func (g *group) handleJoin(creq *clientReq) (kmsg.Response, bool) { req := creq.kreq.(*kmsg.JoinGroupRequest) resp := req.ResponseKind().(*kmsg.JoinGroupResponse) @@ -664,7 +664,7 @@ func (g *group) handleJoin(creq clientReq) (kmsg.Response, bool) { } // Handles a sync, which can transition us to stable. -func (g *group) handleSync(creq clientReq) kmsg.Response { +func (g *group) handleSync(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.SyncGroupRequest) resp := req.ResponseKind().(*kmsg.SyncGroupResponse) @@ -715,7 +715,7 @@ func (g *group) handleSync(creq clientReq) kmsg.Response { // Handles a heartbeat, a relatively simple request that just delays our // session timeout timer. -func (g *group) handleHeartbeat(creq clientReq) kmsg.Response { +func (g *group) handleHeartbeat(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.HeartbeatRequest) resp := req.ResponseKind().(*kmsg.HeartbeatResponse) @@ -751,7 +751,7 @@ func (g *group) handleHeartbeat(creq clientReq) kmsg.Response { // Handles a leave. We trigger a rebalance for every member leaving in a batch // request, but that's fine because of our manage serialization. -func (g *group) handleLeave(creq clientReq) kmsg.Response { +func (g *group) handleLeave(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.LeaveGroupRequest) resp := req.ResponseKind().(*kmsg.LeaveGroupResponse) @@ -784,7 +784,7 @@ func (g *group) handleLeave(creq clientReq) kmsg.Response { g.stopPending(p) } } else { - g.updateMemberAndRebalance(m, clientReq{}, nil) + g.updateMemberAndRebalance(m, nil, nil) } } @@ -806,7 +806,7 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp } // Handles a commit. -func (g *group) handleOffsetCommit(creq clientReq) *kmsg.OffsetCommitResponse { +func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) @@ -985,7 +985,7 @@ func (g *group) completeLeaderSync(req *kmsg.SyncGroupRequest) { func (g *group) updateHeartbeat(m *groupMember) { g.atSessionTimeout(m, func() { - g.updateMemberAndRebalance(m, clientReq{}, nil) + g.updateMemberAndRebalance(m, nil, nil) }) } @@ -1024,7 +1024,7 @@ func (g *group) atSessionTimeout(m *groupMember, fn func()) { // This is used to update a member from a new join request, or to clear a // member from failed heartbeats. -func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply clientReq, newJoin *kmsg.JoinGroupRequest) { +func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply *clientReq, newJoin *kmsg.JoinGroupRequest) { for _, p := range m.join.Protocols { g.protocols[p.Name]-- } @@ -1050,7 +1050,7 @@ func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply clientReq, } // Adds a new member to the group and rebalances. -func (g *group) addMemberAndRebalance(m *groupMember, waitingReply clientReq, join *kmsg.JoinGroupRequest) { +func (g *group) addMemberAndRebalance(m *groupMember, waitingReply *clientReq, join *kmsg.JoinGroupRequest) { g.stopPending(m) m.join = join for _, p := range m.join.Protocols { @@ -1132,14 +1132,14 @@ members: return metadata } -func (g *group) reply(creq clientReq, kresp kmsg.Response, m *groupMember) { +func (g *group) reply(creq *clientReq, kresp kmsg.Response, m *groupMember) { select { case creq.cc.respCh <- clientResp{kresp: kresp, corr: creq.corr, seq: creq.seq}: case <-g.c.die: return } if m != nil { - m.waitingReply = clientReq{} + m.waitingReply = nil g.updateHeartbeat(m) } } diff --git a/pkg/kfake/sasl.go b/pkg/kfake/sasl.go index 9140157b..413bb3bd 100644 --- a/pkg/kfake/sasl.go +++ b/pkg/kfake/sasl.go @@ -47,7 +47,7 @@ const ( saslStageComplete ) -func (c *Cluster) handleSASL(creq clientReq) (allow bool) { +func (c *Cluster) handleSASL(creq *clientReq) (allow bool) { switch creq.cc.saslStage { case saslStageBegin: switch creq.kreq.(type) {