Skip to content

Commit

Permalink
kfake: add SleepControl
Browse files Browse the repository at this point in the history
This function allows you to sleep a function you are controlling until
your wakeup function returns. The control function effectively yields to
other requests. Note that since requests must be handled in order, you
need to be a bit careful to not block other requests (unless you
intentionally do that).

This basically does what it says on the tin. The behavior of everything
else is unchanged -- you can KeepControl, you can return false to say it
wasn't handled, etc.

The logic and control flow is very ugly, but it works, and already took
a long time to implement, so I'm unlikely to change it.

In working on this, I also found and fixed a bug that resulted in
correllation errors when handling join&sync.

kgo group tests still work against kfake's "hidden" main.go, and I have
tested SleepControl with/without KeepControl, and with/without returning
handled=true.
  • Loading branch information
twmb committed Dec 17, 2023
1 parent 070f8f9 commit 90bbc78
Show file tree
Hide file tree
Showing 17 changed files with 392 additions and 94 deletions.
4 changes: 2 additions & 2 deletions pkg/kfake/01_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func init() { regKey(1, 4, 13) }

func (c *Cluster) handleFetch(creq clientReq, w *watchFetch) (kmsg.Response, error) {
func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) {
var (
req = creq.kreq.(*kmsg.FetchRequest)
resp = req.ResponseKind().(*kmsg.FetchResponse)
Expand Down Expand Up @@ -182,7 +182,7 @@ type watchFetch struct {
need int
needp tps[int]
deadline time.Time
creq clientReq
creq *clientReq

in []*partData
cb func()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/08_offset_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(8, 0, 8) }

func (c *Cluster) handleOffsetCommit(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetCommit(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetCommitRequest)
resp := req.ResponseKind().(*kmsg.OffsetCommitResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/09_offset_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(9, 0, 8) }

func (c *Cluster) handleOffsetFetch(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetFetch(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetFetchRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/11_join_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(11, 0, 9) }

func (c *Cluster) handleJoinGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleJoinGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.JoinGroupRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/12_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(12, 0, 4) }

func (c *Cluster) handleHeartbeat(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleHeartbeat(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.HeartbeatRequest)
resp := req.ResponseKind().(*kmsg.HeartbeatResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/13_leave_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(13, 0, 5) }

func (c *Cluster) handleLeaveGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleLeaveGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.LeaveGroupRequest)
resp := req.ResponseKind().(*kmsg.LeaveGroupResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/14_sync_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(14, 0, 5) }

func (c *Cluster) handleSyncGroup(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSyncGroup(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SyncGroupRequest)
resp := req.ResponseKind().(*kmsg.SyncGroupResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/15_describe_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(15, 0, 5) }

func (c *Cluster) handleDescribeGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleDescribeGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.DescribeGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/16_list_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(16, 0, 4) }

func (c *Cluster) handleListGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleListGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.ListGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/17_sasl_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(17, 1, 1) }

func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSASLHandshake(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLHandshakeRequest)
resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/36_sasl_authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func init() { regKey(36, 0, 2) }

func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleSASLAuthenticate(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLAuthenticateRequest)
resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/42_delete_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

func init() { regKey(42, 0, 2) }

func (c *Cluster) handleDeleteGroups(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleDeleteGroups(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.DeleteGroupsRequest)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/47_offset_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func init() { regKey(47, 0, 0) }

func (c *Cluster) handleOffsetDelete(creq clientReq) (kmsg.Response, error) {
func (c *Cluster) handleOffsetDelete(creq *clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.OffsetDeleteRequest)
resp := req.ResponseKind().(*kmsg.OffsetDeleteResponse)

Expand Down
9 changes: 6 additions & 3 deletions pkg/kfake/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type (
cc *clientConn
kreq kmsg.Request
at time.Time
corr int32
cid string
corr int32
seq uint32
}

Expand All @@ -38,7 +38,7 @@ type (
}
)

func (creq clientReq) empty() bool { return creq.cc == nil || creq.kreq == nil }
func (creq *clientReq) empty() bool { return creq == nil || creq.cc == nil || creq.kreq == nil }

func (cc *clientConn) read() {
defer cc.conn.Close()
Expand Down Expand Up @@ -101,7 +101,7 @@ func (cc *clientConn) read() {
}

select {
case cc.c.reqCh <- clientReq{cc, kreq, time.Now(), corr, cid, seq}:
case cc.c.reqCh <- &clientReq{cc, kreq, time.Now(), cid, corr, seq}:
seq++
case <-cc.c.die:
return
Expand Down Expand Up @@ -141,6 +141,9 @@ func (cc *clientConn) write() {
case <-cc.c.die:
return
}
} else {
delete(oooresp, seq)
seq++
}
if err := resp.err; err != nil {
cc.c.cfg.logger.Logf(LogLevelInfo, "client %s request unable to be handled: %v", who, err)
Expand Down
Loading

0 comments on commit 90bbc78

Please sign in to comment.