Skip to content

Commit

Permalink
Merge pull request #638 from twmb/kfake_sleep
Browse files Browse the repository at this point in the history
kfake: add SleepControl
  • Loading branch information
twmb authored Dec 17, 2023
2 parents 070f8f9 + 3226dac commit a9e75b8
Show file tree
Hide file tree
Showing 17 changed files with 412 additions and 92 deletions.
4 changes: 2 additions & 2 deletions pkg/kfake/01_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func init() { regKey(1, 4, 13) }

func (c *Cluster) handleFetch(creq clientReq, w *watchFetch) (kmsg.Response, error) {
func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) {
var (
req = creq.kreq.(*kmsg.FetchRequest)
resp = req.ResponseKind().(*kmsg.FetchResponse)
Expand Down Expand Up @@ -182,7 +182,7 @@ type watchFetch struct {
need int
needp tps[int]
deadline time.Time
creq clientReq
creq *clientReq

in []*partData
cb func()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/08_offset_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(8, 0, 8) }

func (c *Cluster) handleOffsetCommit(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/09_offset_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(9, 0, 8) }

func (c *Cluster) handleOffsetFetch(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetFetch(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetFetchRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/11_join_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(11, 0, 9) }

func (c *Cluster) handleJoinGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleJoinGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.JoinGroupRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/12_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(12, 0, 4) }

func (c *Cluster) handleHeartbeat(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleHeartbeat(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.HeartbeatRequest)
resp := req.ResponseKind().(*kmsg.HeartbeatResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/13_leave_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(13, 0, 5) }

func (c *Cluster) handleLeaveGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleLeaveGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.LeaveGroupRequest)
resp := req.ResponseKind().(*kmsg.LeaveGroupResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/14_sync_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(14, 0, 5) }

func (c *Cluster) handleSyncGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSyncGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SyncGroupRequest)
resp := req.ResponseKind().(*kmsg.SyncGroupResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/15_describe_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(15, 0, 5) }

func (c *Cluster) handleDescribeGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleDescribeGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.DescribeGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/16_list_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(16, 0, 4) }

func (c *Cluster) handleListGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleListGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.ListGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/17_sasl_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(17, 1, 1) }

func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSASLHandshake(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLHandshakeRequest)
resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/36_sasl_authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func init() { regKey(36, 0, 2) }

func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSASLAuthenticate(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLAuthenticateRequest)
resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/42_delete_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(42, 0, 2) }

func (c *Cluster) handleDeleteGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleDeleteGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.DeleteGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/47_offset_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(47, 0, 0) }

func (c *Cluster) handleOffsetDelete(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetDelete(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetDeleteRequest)
resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse)

Expand Down
9 changes: 6 additions & 3 deletions pkg/kfake/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type (
cc *clientConn
kreq kmsg.Request
at time.Time
corr int32
cid string
corr int32
seq uint32
}

Expand All @@ -38,7 +38,7 @@ type (
}
)

func (creq clientReq) empty() bool { return creq.cc == nil || creq.kreq == nil }
func (creq *clientReq) empty() bool { return creq == nil || creq.cc == nil || creq.kreq == nil }

func (cc *clientConn) read() {
defer cc.conn.Close()
Expand Down Expand Up @@ -101,7 +101,7 @@ func (cc *clientConn) read() {
}

select {
case cc.c.reqCh <- clientReq{cc, kreq, time.Now(), corr, cid, seq}:
case cc.c.reqCh <- &clientReq{cc, kreq, time.Now(), cid, corr, seq}:
seq++
case <-cc.c.die:
return
Expand Down Expand Up @@ -141,6 +141,9 @@ func (cc *clientConn) write() {
case <-cc.c.die:
return
}
} else {
delete(oooresp, seq)
seq++
}
if err := resp.err; err != nil {
cc.c.cfg.logger.Logf(LogLevelInfo, "client %s request unable to be handled: %v", who, err)
Expand Down
Loading

0 comments on commit a9e75b8

Please sign in to comment.