diff --git a/pkg/kfake/01_fetch.go b/pkg/kfake/01_fetch.go index 08053d1e..f0fc4f07 100644 --- a/pkg/kfake/01_fetch.go +++ b/pkg/kfake/01_fetch.go @@ -18,7 +18,7 @@ import ( func init() { regKey(1, 4, 13) } -func (c *Cluster) handleFetch(creq clientReq, w *watchFetch) (kmsg.Response, error) { +func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) { var ( req = creq.kreq.(*kmsg.FetchRequest) resp = req.ResponseKind().(*kmsg.FetchResponse) @@ -182,7 +182,7 @@ type watchFetch struct { need int needp tps[int] deadline time.Time - creq clientReq + creq *clientReq in []*partData cb func() diff --git a/pkg/kfake/08_offset_commit.go b/pkg/kfake/08_offset_commit.go index 69e9dab7..692f4cc0 100644 --- a/pkg/kfake/08_offset_commit.go +++ b/pkg/kfake/08_offset_commit.go @@ -7,7 +7,7 @@ import ( func init() { regKey(8, 0, 8) } -func (c *Cluster) handleOffsetCommit(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) diff --git a/pkg/kfake/09_offset_fetch.go b/pkg/kfake/09_offset_fetch.go index 24047d4f..204339ec 100644 --- a/pkg/kfake/09_offset_fetch.go +++ b/pkg/kfake/09_offset_fetch.go @@ -6,7 +6,7 @@ import ( func init() { regKey(9, 0, 8) } -func (c *Cluster) handleOffsetFetch(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetFetch(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetFetchRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/11_join_group.go b/pkg/kfake/11_join_group.go index ef537785..70ed1d89 100644 --- a/pkg/kfake/11_join_group.go +++ b/pkg/kfake/11_join_group.go @@ -6,7 +6,7 @@ import ( func init() { regKey(11, 0, 9) } -func (c *Cluster) handleJoinGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleJoinGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.JoinGroupRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/12_heartbeat.go b/pkg/kfake/12_heartbeat.go index b8e1a188..59f12712 100644 --- a/pkg/kfake/12_heartbeat.go +++ b/pkg/kfake/12_heartbeat.go @@ -7,7 +7,7 @@ import ( func init() { regKey(12, 0, 4) } -func (c *Cluster) handleHeartbeat(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleHeartbeat(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.HeartbeatRequest) resp := req.ResponseKind().(*kmsg.HeartbeatResponse) diff --git a/pkg/kfake/13_leave_group.go b/pkg/kfake/13_leave_group.go index 9f17ec6c..e941f19e 100644 --- a/pkg/kfake/13_leave_group.go +++ b/pkg/kfake/13_leave_group.go @@ -7,7 +7,7 @@ import ( func init() { regKey(13, 0, 5) } -func (c *Cluster) handleLeaveGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleLeaveGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.LeaveGroupRequest) resp := req.ResponseKind().(*kmsg.LeaveGroupResponse) diff --git a/pkg/kfake/14_sync_group.go b/pkg/kfake/14_sync_group.go index 07ea0538..9944b112 100644 --- a/pkg/kfake/14_sync_group.go +++ b/pkg/kfake/14_sync_group.go @@ -7,7 +7,7 @@ import ( func init() { regKey(14, 0, 5) } -func (c *Cluster) handleSyncGroup(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSyncGroup(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SyncGroupRequest) resp := req.ResponseKind().(*kmsg.SyncGroupResponse) diff --git a/pkg/kfake/15_describe_groups.go b/pkg/kfake/15_describe_groups.go index 1152bcf8..8791759b 100644 --- a/pkg/kfake/15_describe_groups.go +++ b/pkg/kfake/15_describe_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(15, 0, 5) } -func (c *Cluster) handleDescribeGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleDescribeGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.DescribeGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/16_list_groups.go b/pkg/kfake/16_list_groups.go index e254284b..6d0189c4 100644 --- a/pkg/kfake/16_list_groups.go +++ b/pkg/kfake/16_list_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(16, 0, 4) } -func (c *Cluster) handleListGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleListGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.ListGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/17_sasl_handshake.go b/pkg/kfake/17_sasl_handshake.go index 34028bc8..8a80cbf1 100644 --- a/pkg/kfake/17_sasl_handshake.go +++ b/pkg/kfake/17_sasl_handshake.go @@ -7,7 +7,7 @@ import ( func init() { regKey(17, 1, 1) } -func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSASLHandshake(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SASLHandshakeRequest) resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse) diff --git a/pkg/kfake/36_sasl_authenticate.go b/pkg/kfake/36_sasl_authenticate.go index ec0e2a97..b94d2f01 100644 --- a/pkg/kfake/36_sasl_authenticate.go +++ b/pkg/kfake/36_sasl_authenticate.go @@ -9,7 +9,7 @@ import ( func init() { regKey(36, 0, 2) } -func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleSASLAuthenticate(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.SASLAuthenticateRequest) resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse) diff --git a/pkg/kfake/42_delete_groups.go b/pkg/kfake/42_delete_groups.go index e820b94f..68257415 100644 --- a/pkg/kfake/42_delete_groups.go +++ b/pkg/kfake/42_delete_groups.go @@ -6,7 +6,7 @@ import ( func init() { regKey(42, 0, 2) } -func (c *Cluster) handleDeleteGroups(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleDeleteGroups(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.DeleteGroupsRequest) if err := checkReqVersion(req.Key(), req.Version); err != nil { diff --git a/pkg/kfake/47_offset_delete.go b/pkg/kfake/47_offset_delete.go index 900b8bee..878e83b4 100644 --- a/pkg/kfake/47_offset_delete.go +++ b/pkg/kfake/47_offset_delete.go @@ -7,7 +7,7 @@ import ( func init() { regKey(47, 0, 0) } -func (c *Cluster) handleOffsetDelete(creq clientReq) (kmsg.Response, error) { +func (c *Cluster) handleOffsetDelete(creq *clientReq) (kmsg.Response, error) { req := creq.kreq.(*kmsg.OffsetDeleteRequest) resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse) diff --git a/pkg/kfake/client_conn.go b/pkg/kfake/client_conn.go index 2d6b89f1..a78b574f 100644 --- a/pkg/kfake/client_conn.go +++ b/pkg/kfake/client_conn.go @@ -25,8 +25,8 @@ type ( cc *clientConn kreq kmsg.Request at time.Time - corr int32 cid string + corr int32 seq uint32 } @@ -38,7 +38,7 @@ type ( } ) -func (creq clientReq) empty() bool { return creq.cc == nil || creq.kreq == nil } +func (creq *clientReq) empty() bool { return creq == nil || creq.cc == nil || creq.kreq == nil } func (cc *clientConn) read() { defer cc.conn.Close() @@ -101,7 +101,7 @@ func (cc *clientConn) read() { } select { - case cc.c.reqCh <- clientReq{cc, kreq, time.Now(), corr, cid, seq}: + case cc.c.reqCh <- &clientReq{cc, kreq, time.Now(), cid, corr, seq}: seq++ case <-cc.c.die: return @@ -141,6 +141,9 @@ func (cc *clientConn) write() { case <-cc.c.die: return } + } else { + delete(oooresp, seq) + seq++ } if err := resp.err; err != nil { cc.c.cfg.logger.Logf(LogLevelInfo, "client %s request unable to be handled: %v", who, err) diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 0fc46a8d..bdeb736d 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -30,13 +30,16 @@ type ( bs []*broker adminCh chan func() - reqCh chan clientReq + reqCh chan *clientReq + wakeCh chan *slept watchFetchCh chan *watchFetch - controlMu sync.Mutex - control map[int16][]controlFn - keepCurrentControl atomic.Bool - currentBroker atomic.Pointer[broker] + controlMu sync.Mutex + control map[int16]map[*controlCtx]struct{} + currentBroker *broker + currentControl *controlCtx + sleeping map[*clientConn]*bsleep + controlSleep chan sleepChs data data pids pids @@ -56,6 +59,19 @@ type ( } controlFn func(kmsg.Request) (kmsg.Response, error, bool) + + controlCtx struct { + key int16 + fn controlFn + keep bool + lastReq map[*clientConn]*clientReq // used to not re-run requests that slept, see doc comments below + } + + controlResp struct { + kresp kmsg.Response + err error + handled bool + } ) // MustCluster is like NewCluster, but panics on error. @@ -91,9 +107,13 @@ func NewCluster(opts ...Opt) (*Cluster, error) { cfg: cfg, adminCh: make(chan func()), - reqCh: make(chan clientReq, 20), + reqCh: make(chan *clientReq, 20), + wakeCh: make(chan *slept, 10), watchFetchCh: make(chan *watchFetch, 20), - control: make(map[int16][]controlFn), + control: make(map[int16]map[*controlCtx]struct{}), + controlSleep: make(chan sleepChs, 1), + + sleeping: make(map[*clientConn]*bsleep), data: data{ id2t: make(map[uuid]string), @@ -235,27 +255,100 @@ func (b *broker) listen() { func (c *Cluster) run() { for { - var creq clientReq - var w *watchFetch + var ( + creq *clientReq + w *watchFetch + s *slept + kreq kmsg.Request + kresp kmsg.Response + err error + handled bool + ) select { + case <-c.die: + return + + case admin := <-c.adminCh: + // Run a custom request in the context of the cluster. + admin() + continue + case creq = <-c.reqCh: + // If we have any sleeping request on this node, + // we enqueue the new live request to the end and + // wait for the sleeping request to finish. + bs := c.sleeping[creq.cc] + if bs.enqueue(&slept{ + creq: creq, + waiting: true, + ran: make(chan struct{}, 1), + }) { + continue + } + + case s = <-c.wakeCh: + // On wakeup, we know we are handling a control + // function that was slept, or a request that was + // waiting for a control function to finish sleeping. + creq = s.creq + if s.waiting { + break + } + + // We are handling a previously slept control function. + // If it finishes, we take the resp/err/handled and + // unconditionally dequeue it from the sleep goro + // queue. + // + // It is possible the control function still returns + // "I did not handle this request" -- that is fine, we + // just do not pop the ctx. The controlCtx will ensure we + // do not process this request again via lastReq. + // + // Control flow gets really weird here. The controlMu + // is unlocked. When a control function exits, it + // locks. Since we are not *in* tryControl, we actually + // want to leave the mu unlocked. So, if we get a res, + // we unlock the mu. See below where "weird" is again + // mentioned. + c.controlMu.Lock() + c.currentBroker = s.creq.cc.b + c.currentControl = s.cctx + c.controlMu.Unlock() + s.cchs.clientCont <- struct{}{} + select { + case res := <-s.res: + c.currentControl = nil + c.currentBroker = nil + c.controlMu.Unlock() + kresp, err, handled = res.kresp, res.err, res.handled + s.ran <- struct{}{} + cctx := s.cctx + s = nil + if handled { + c.popControl(cctx) + goto afterControl + } + case cchs := <-c.controlSleep: + c.controlMu.Lock() + c.currentBroker = nil + c.currentControl = nil + c.controlMu.Unlock() + s.cchs = cchs + s.ran <- struct{}{} + continue + } + case w = <-c.watchFetchCh: if w.cleaned { continue // already cleaned up, this is an extraneous timer fire } w.cleanup(c) creq = w.creq - case <-c.die: - return - case fn := <-c.adminCh: - // Run a custom request in the context of the cluster - fn() - continue } - kreq := creq.kreq - kresp, err, handled := c.tryControl(kreq, creq.cc.b) + kresp, err, handled = c.tryControl(creq) if handled { goto afterControl } @@ -267,6 +360,7 @@ func (c *Cluster) run() { } } + kreq = creq.kreq switch k := kmsg.Key(kreq.Key()); k { case kmsg.Produce: kresp, err = c.handleProduce(creq.cc.b, kreq) @@ -331,11 +425,14 @@ func (c *Cluster) run() { case kmsg.AlterUserSCRAMCredentials: kresp, err = c.handleAlterUserSCRAMCredentials(creq.cc.b, kreq) default: - err = fmt.Errorf("unahndled key %v", k) + err = fmt.Errorf("unhandled key %v", k) } afterControl: - if kresp == nil && err == nil { // produce request with no acks, or hijacked group request + if s != nil { + s.ran <- struct{}{} + } + if kresp == nil && err == nil { // produce request with no acks, or otherwise hijacked request (group, sleep) continue } @@ -363,9 +460,7 @@ func (c *Cluster) run() { // It is safe to add new control functions within a control function. Control // functions are not called concurrently. func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { - c.controlMu.Lock() - defer c.controlMu.Unlock() - c.control[-1] = append(c.control[-1], fn) + c.ControlKey(-1, fn) } // Control is a function to call on a specific request key that the cluster @@ -386,65 +481,265 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) { func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool)) { c.controlMu.Lock() defer c.controlMu.Unlock() - c.control[key] = append(c.control[key], fn) + m := c.control[key] + if m == nil { + m = make(map[*controlCtx]struct{}) + c.control[key] = m + } + m[&controlCtx{ + key: key, + fn: fn, + lastReq: make(map[*clientConn]*clientReq), + }] = struct{}{} } // KeepControl marks the currently running control function to be kept even if // you handle the request and return true. This can be used to continuously // control requests without needing to re-add control functions manually. func (c *Cluster) KeepControl() { - c.keepCurrentControl.Swap(true) + c.controlMu.Lock() + defer c.controlMu.Unlock() + if c.currentControl != nil { + c.currentControl.keep = true + } +} + +// SleepControl sleeps the current control function until wakeup returns. This +// yields to run any other connection. +// +// Note that per protocol, requests on the same connection must be replied to +// in order. Many clients write multiple requests to the same connection, so +// if you sleep until a different request runs, you may sleep forever -- you +// must know the semantics of your client to know whether requests run on +// different connections (or, ensure you are writing to different brokers). +// +// For example, franz-go uses a dedicated connection for: +// - produce requests +// - fetch requests +// - join&sync requests +// - requests with a Timeout field +// - all other request +// +// So, for franz-go, there are up to five separate connections depending +// on what you are doing. +// +// You can run SleepControl multiple times in the same control function. If you +// sleep a request you are controlling, and another request of the same key +// comes in, it will run the same control function and may also sleep (i.e., +// you must have logic if you want to avoid sleeping on the same request). +func (c *Cluster) SleepControl(wakeup func()) { + c.controlMu.Lock() + if c.currentControl == nil { + c.controlMu.Unlock() + return + } + c.controlMu.Unlock() + + cchs := sleepChs{ + clientWait: make(chan struct{}, 1), + clientCont: make(chan struct{}, 1), + } + go func() { + wakeup() + cchs.clientWait <- struct{}{} + }() + + c.controlSleep <- cchs + <-cchs.clientCont } // CurrentNode is solely valid from within a control function; it returns // the broker id that the request was received by. // If there's no request currently inflight, this returns -1. func (c *Cluster) CurrentNode() int32 { - if b := c.currentBroker.Load(); b != nil { + c.controlMu.Lock() + defer c.controlMu.Unlock() + if b := c.currentBroker; b != nil { return b.node } return -1 } -func (c *Cluster) tryControl(kreq kmsg.Request, b *broker) (kresp kmsg.Response, err error, handled bool) { - c.currentBroker.Store(b) - defer c.currentBroker.Store(nil) +func (c *Cluster) tryControl(creq *clientReq) (kresp kmsg.Response, err error, handled bool) { c.controlMu.Lock() defer c.controlMu.Unlock() if len(c.control) == 0 { return nil, nil, false } - kresp, err, handled = c.tryControlKey(kreq.Key(), kreq, b) + kresp, err, handled = c.tryControlKey(creq.kreq.Key(), creq) if !handled { - kresp, err, handled = c.tryControlKey(-1, kreq, b) + kresp, err, handled = c.tryControlKey(-1, creq) } return kresp, err, handled } -func (c *Cluster) tryControlKey(key int16, kreq kmsg.Request, b *broker) (kresp kmsg.Response, err error, handled bool) { - for i, fn := range c.control[key] { - kresp, err, handled = c.callControl(key, kreq, fn) - if handled { - // fn may have called Control, ControlKey, or KeepControl, - // all of which will append to c.control; refresh the slice. - fns := c.control[key] - c.control[key] = append(fns[:i], fns[i+1:]...) - return +func (c *Cluster) tryControlKey(key int16, creq *clientReq) (kmsg.Response, error, bool) { + for cctx := range c.control[key] { + if cctx.lastReq[creq.cc] == creq { + continue + } + cctx.lastReq[creq.cc] = creq + res := make(chan controlResp, 1) + go func() { + // We unlock before entering a control function so that + // the control function can modify / add more control. + // We re-lock when exiting the control function. + c.currentBroker = creq.cc.b + c.currentControl = cctx + c.controlMu.Unlock() + defer func() { + c.controlMu.Lock() + c.currentControl = nil + c.currentBroker = nil + }() + kresp, err, handled := cctx.fn(creq.kreq) + res <- controlResp{kresp, err, handled} + }() + select { + case res := <-res: + if res.handled { + c.popControl(cctx) + return res.kresp, res.err, true + } + case cchs := <-c.controlSleep: + bs := c.sleeping[creq.cc] + if bs == nil { + bs = &bsleep{c: c} + c.sleeping[creq.cc] = bs + } + bs.enqueue(&slept{ + cctx: cctx, + cchs: cchs, + res: res, + creq: creq, + ran: make(chan struct{}, 1), + }) + // Control flow gets really weird here. We unlocked + // when entering the control function, so we have to + // re-lock now to be unlocked by the caller. However, + // before we close the signal channel, we have to + // re-lock and set the currentControl/currentBroker. + // This is *super* ugly. + c.controlMu.Lock() + c.currentControl = nil + c.currentBroker = nil + return nil, nil, true } } - return + return nil, nil, false } -func (c *Cluster) callControl(key int16, req kmsg.Request, fn controlFn) (kresp kmsg.Response, err error, handled bool) { - c.keepCurrentControl.Swap(false) - c.controlMu.Unlock() - defer func() { - c.controlMu.Lock() - if handled && c.keepCurrentControl.Swap(false) { - c.control[key] = append(c.control[key], fn) +func (c *Cluster) popControl(cctx *controlCtx) { + if !cctx.keep { + delete(c.control[cctx.key], cctx) + } +} + +// bsleep manages sleeping requests on a broker, or non-sleeping requests +// that are waiting for sleeping requests to finish. +type bsleep struct { + c *Cluster + mu sync.Mutex + queue []*slept +} + +// slept is a sleeping request. If sig is nil, this request is enqueued +// after a previous sleeping req. We only pop from slept once the current +// request is done -- it is possible for a request to sleep multiple times, +type slept struct { + cctx *controlCtx + cchs sleepChs + res <-chan controlResp + creq *clientReq + waiting bool + + ran chan struct{} +} + +type sleepChs struct { + clientWait chan struct{} + clientCont chan struct{} +} + +// enqueue has a few potential behaviors. +// +// * If s is waiting, this is a new request enqueueing to the back of an +// existing queue, where we are waiting for the head request to finish +// sleeping. Easy case. +// +// * If s is not waiting, this is a sleeping request. If the queue is empty, +// this is the first sleeping request on a node. We enqueue and start our wait +// goroutine. Easy. +// +// * If s is not waiting, but our queue is non-empty, this must be from a +// convoluted scenario: There was a request in front of us that slept, and we +// were waiting. When we dequeued, we ran and we ourselves now slept. We are +// now re-enqueueing ourself. Rather than add to the back, we update our head +// request with the new enqueued values. In this last case, bsleep is actually +// waiting for a signal down 'ran', and it will be signaled in the 'run' +// goroutine once tryControl returns (which it will, right after we are done +// here). We need to update values on the head. Since ran is currently being +// read, we update everything but ran. +func (bs *bsleep) enqueue(s *slept) bool { + if bs == nil { + return false + } + bs.mu.Lock() + defer bs.mu.Unlock() + if s.waiting { + if len(bs.queue) > 0 { + bs.queue = append(bs.queue, s) + return true } - }() - return fn(req) + return false + } + if len(bs.queue) == 0 { + bs.queue = append(bs.queue, s) + go bs.wait() + return true + } + q0 := bs.queue[0] + if q0.creq != s.creq { + panic("internal error: sleeping request not head request") + } + q0.cctx = s.cctx + q0.cchs = s.cchs + q0.res = s.res + q0.waiting = s.waiting + return true +} + +func (bs *bsleep) wait() { + for { + bs.mu.Lock() + if len(bs.queue) == 0 { + bs.mu.Unlock() + return + } + q0 := bs.queue[0] + bs.mu.Unlock() + + if q0.cchs.clientWait != nil { + select { + case <-bs.c.die: + case <-q0.cchs.clientWait: + q0.cchs.clientWait = nil + } + } + + select { + case <-bs.c.die: + return + case bs.c.wakeCh <- q0: + } + + <-q0.ran + if q0.cchs.clientWait == nil { + bs.mu.Lock() + bs.queue = bs.queue[1:] + bs.mu.Unlock() + } + } } // Various administrative requests can be passed into the cluster to simulate diff --git a/pkg/kfake/groups.go b/pkg/kfake/groups.go index a0a70b11..853c3d68 100644 --- a/pkg/kfake/groups.go +++ b/pkg/kfake/groups.go @@ -38,7 +38,7 @@ type ( protocols map[string]int protocol string - reqCh chan clientReq + reqCh chan *clientReq controlCh chan func() nJoining int @@ -58,7 +58,7 @@ type ( // waitingReply is non-nil if a client is waiting for a reply // from us for a JoinGroupRequest or a SyncGroupRequest. - waitingReply clientReq + waitingReply *clientReq assignment []byte @@ -105,7 +105,7 @@ func (c *Cluster) coordinator(id string) *broker { return c.bs[n] } -func (c *Cluster) validateGroup(creq clientReq, group string) *kerr.Error { +func (c *Cluster) validateGroup(creq *clientReq, group string) *kerr.Error { switch key := kmsg.Key(creq.kreq.Key()); key { case kmsg.OffsetCommit, kmsg.OffsetFetch, kmsg.DescribeGroups, kmsg.DeleteGroups: default: @@ -132,7 +132,7 @@ func generateMemberID(clientID string, instanceID *string) string { //////////// // handleJoin completely hijacks the incoming request. -func (gs *groups) handleJoin(creq clientReq) { +func (gs *groups) handleJoin(creq *clientReq) { if gs.gs == nil { gs.gs = make(map[string]*group) } @@ -147,7 +147,7 @@ start: members: make(map[string]*groupMember), pending: make(map[string]*groupMember), protocols: make(map[string]int), - reqCh: make(chan clientReq), + reqCh: make(chan *clientReq), controlCh: make(chan func()), quitCh: make(chan struct{}), } @@ -165,7 +165,7 @@ start: // Returns true if the request is hijacked and handled, otherwise false if the // group does not exist. -func (gs *groups) handleHijack(group string, creq clientReq) bool { +func (gs *groups) handleHijack(group string, creq *clientReq) bool { if gs.gs == nil { return false } @@ -181,27 +181,27 @@ func (gs *groups) handleHijack(group string, creq clientReq) bool { } } -func (gs *groups) handleSync(creq clientReq) bool { +func (gs *groups) handleSync(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.SyncGroupRequest).Group, creq) } -func (gs *groups) handleHeartbeat(creq clientReq) bool { +func (gs *groups) handleHeartbeat(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.HeartbeatRequest).Group, creq) } -func (gs *groups) handleLeave(creq clientReq) bool { +func (gs *groups) handleLeave(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.LeaveGroupRequest).Group, creq) } -func (gs *groups) handleOffsetCommit(creq clientReq) bool { +func (gs *groups) handleOffsetCommit(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.OffsetCommitRequest).Group, creq) } -func (gs *groups) handleOffsetDelete(creq clientReq) bool { +func (gs *groups) handleOffsetDelete(creq *clientReq) bool { return gs.handleHijack(creq.kreq.(*kmsg.OffsetDeleteRequest).Group, creq) } -func (gs *groups) handleList(creq clientReq) *kmsg.ListGroupsResponse { +func (gs *groups) handleList(creq *clientReq) *kmsg.ListGroupsResponse { req := creq.kreq.(*kmsg.ListGroupsRequest) resp := req.ResponseKind().(*kmsg.ListGroupsResponse) @@ -233,7 +233,7 @@ func (gs *groups) handleList(creq clientReq) *kmsg.ListGroupsResponse { return resp } -func (gs *groups) handleDescribe(creq clientReq) *kmsg.DescribeGroupsResponse { +func (gs *groups) handleDescribe(creq *clientReq) *kmsg.DescribeGroupsResponse { req := creq.kreq.(*kmsg.DescribeGroupsRequest) resp := req.ResponseKind().(*kmsg.DescribeGroupsResponse) @@ -285,7 +285,7 @@ func (gs *groups) handleDescribe(creq clientReq) *kmsg.DescribeGroupsResponse { return resp } -func (gs *groups) handleDelete(creq clientReq) *kmsg.DeleteGroupsResponse { +func (gs *groups) handleDelete(creq *clientReq) *kmsg.DeleteGroupsResponse { req := creq.kreq.(*kmsg.DeleteGroupsRequest) resp := req.ResponseKind().(*kmsg.DeleteGroupsResponse) @@ -324,7 +324,7 @@ func (gs *groups) handleDelete(creq clientReq) *kmsg.DeleteGroupsResponse { return resp } -func (gs *groups) handleOffsetFetch(creq clientReq) *kmsg.OffsetFetchResponse { +func (gs *groups) handleOffsetFetch(creq *clientReq) *kmsg.OffsetFetchResponse { req := creq.kreq.(*kmsg.OffsetFetchRequest) resp := req.ResponseKind().(*kmsg.OffsetFetchResponse) @@ -423,7 +423,7 @@ func (gs *groups) handleOffsetFetch(creq clientReq) *kmsg.OffsetFetchResponse { return resp } -func (g *group) handleOffsetDelete(creq clientReq) *kmsg.OffsetDeleteResponse { +func (g *group) handleOffsetDelete(creq *clientReq) *kmsg.OffsetDeleteResponse { req := creq.kreq.(*kmsg.OffsetDeleteRequest) resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse) @@ -588,7 +588,7 @@ func (g *group) quitOnce() { // to the client to immediately rejoin if a new client enters the group. // // If this returns nil, the request will be replied to later. -func (g *group) handleJoin(creq clientReq) (kmsg.Response, bool) { +func (g *group) handleJoin(creq *clientReq) (kmsg.Response, bool) { req := creq.kreq.(*kmsg.JoinGroupRequest) resp := req.ResponseKind().(*kmsg.JoinGroupResponse) @@ -664,7 +664,7 @@ func (g *group) handleJoin(creq clientReq) (kmsg.Response, bool) { } // Handles a sync, which can transition us to stable. -func (g *group) handleSync(creq clientReq) kmsg.Response { +func (g *group) handleSync(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.SyncGroupRequest) resp := req.ResponseKind().(*kmsg.SyncGroupResponse) @@ -715,7 +715,7 @@ func (g *group) handleSync(creq clientReq) kmsg.Response { // Handles a heartbeat, a relatively simple request that just delays our // session timeout timer. -func (g *group) handleHeartbeat(creq clientReq) kmsg.Response { +func (g *group) handleHeartbeat(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.HeartbeatRequest) resp := req.ResponseKind().(*kmsg.HeartbeatResponse) @@ -751,7 +751,7 @@ func (g *group) handleHeartbeat(creq clientReq) kmsg.Response { // Handles a leave. We trigger a rebalance for every member leaving in a batch // request, but that's fine because of our manage serialization. -func (g *group) handleLeave(creq clientReq) kmsg.Response { +func (g *group) handleLeave(creq *clientReq) kmsg.Response { req := creq.kreq.(*kmsg.LeaveGroupRequest) resp := req.ResponseKind().(*kmsg.LeaveGroupResponse) @@ -784,7 +784,7 @@ func (g *group) handleLeave(creq clientReq) kmsg.Response { g.stopPending(p) } } else { - g.updateMemberAndRebalance(m, clientReq{}, nil) + g.updateMemberAndRebalance(m, nil, nil) } } @@ -806,7 +806,7 @@ func fillOffsetCommit(req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResp } // Handles a commit. -func (g *group) handleOffsetCommit(creq clientReq) *kmsg.OffsetCommitResponse { +func (g *group) handleOffsetCommit(creq *clientReq) *kmsg.OffsetCommitResponse { req := creq.kreq.(*kmsg.OffsetCommitRequest) resp := req.ResponseKind().(*kmsg.OffsetCommitResponse) @@ -985,7 +985,7 @@ func (g *group) completeLeaderSync(req *kmsg.SyncGroupRequest) { func (g *group) updateHeartbeat(m *groupMember) { g.atSessionTimeout(m, func() { - g.updateMemberAndRebalance(m, clientReq{}, nil) + g.updateMemberAndRebalance(m, nil, nil) }) } @@ -1024,7 +1024,7 @@ func (g *group) atSessionTimeout(m *groupMember, fn func()) { // This is used to update a member from a new join request, or to clear a // member from failed heartbeats. -func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply clientReq, newJoin *kmsg.JoinGroupRequest) { +func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply *clientReq, newJoin *kmsg.JoinGroupRequest) { for _, p := range m.join.Protocols { g.protocols[p.Name]-- } @@ -1050,7 +1050,7 @@ func (g *group) updateMemberAndRebalance(m *groupMember, waitingReply clientReq, } // Adds a new member to the group and rebalances. -func (g *group) addMemberAndRebalance(m *groupMember, waitingReply clientReq, join *kmsg.JoinGroupRequest) { +func (g *group) addMemberAndRebalance(m *groupMember, waitingReply *clientReq, join *kmsg.JoinGroupRequest) { g.stopPending(m) m.join = join for _, p := range m.join.Protocols { @@ -1132,14 +1132,14 @@ members: return metadata } -func (g *group) reply(creq clientReq, kresp kmsg.Response, m *groupMember) { +func (g *group) reply(creq *clientReq, kresp kmsg.Response, m *groupMember) { select { case creq.cc.respCh <- clientResp{kresp: kresp, corr: creq.corr, seq: creq.seq}: case <-g.c.die: return } if m != nil { - m.waitingReply = clientReq{} + m.waitingReply = nil g.updateHeartbeat(m) } } diff --git a/pkg/kfake/sasl.go b/pkg/kfake/sasl.go index 9140157b..413bb3bd 100644 --- a/pkg/kfake/sasl.go +++ b/pkg/kfake/sasl.go @@ -47,7 +47,7 @@ const ( saslStageComplete ) -func (c *Cluster) handleSASL(creq clientReq) (allow bool) { +func (c *Cluster) handleSASL(creq *clientReq) (allow bool) { switch creq.cc.saslStage { case saslStageBegin: switch creq.kreq.(type) {