From b05c3b9a0aee50b05cce40fd042bfee7f82eb6b5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 8 May 2024 23:53:53 -0600 Subject: [PATCH] kfake: support KIP-951, fix OffsetForLeaderEpoch OffsetForLeaderEpoch was not actually correct; now, by searching for the next epoch, the logic is simpler. --- pkg/kfake/00_produce.go | 21 +++++++++++++++++-- pkg/kfake/01_fetch.go | 24 +++++++++++++++++++-- pkg/kfake/23_offset_for_leader_epoch.go | 28 +++++++++++++++++-------- pkg/kfake/cluster.go | 1 + 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/pkg/kfake/00_produce.go b/pkg/kfake/00_produce.go index e2c1c691..9aeb7668 100644 --- a/pkg/kfake/00_produce.go +++ b/pkg/kfake/00_produce.go @@ -2,6 +2,8 @@ package kfake import ( "hash/crc32" + "net" + "strconv" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -14,7 +16,7 @@ import ( // * Multiple batches in one produce // * Compact -func init() { regKey(0, 3, 9) } +func init() { regKey(0, 3, 10) } func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, error) { var ( @@ -46,6 +48,7 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er donet(t, errCode) } } + var includeBrokers bool toresp := func() kmsg.Response { for topic, partitions := range tdone { st := kmsg.NewProduceResponseTopic() @@ -53,6 +56,17 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er st.Partitions = partitions resp.Topics = append(resp.Topics, st) } + if includeBrokers { + for _, b := range c.bs { + sb := kmsg.NewProduceResponseBroker() + h, p, _ := net.SplitHostPort(b.ln.Addr().String()) + p32, _ := strconv.Atoi(p) + sb.NodeID = b.node + sb.Host = h + sb.Port = int32(p32) + resp.Brokers = append(resp.Brokers, sb) + } + } return resp } @@ -76,7 +90,10 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er continue } if pd.leader != b { - donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code) + p := donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code) + p.CurrentLeader.LeaderID = pd.leader.node + p.CurrentLeader.LeaderEpoch = pd.epoch + includeBrokers = true continue } diff --git a/pkg/kfake/01_fetch.go b/pkg/kfake/01_fetch.go index f0fc4f07..53f9a8e6 100644 --- a/pkg/kfake/01_fetch.go +++ b/pkg/kfake/01_fetch.go @@ -1,6 +1,8 @@ package kfake import ( + "net" + "strconv" "sync" "time" @@ -16,7 +18,7 @@ import ( // * Out of range fetch causes early return // * Raw bytes of batch counts against wait bytes -func init() { regKey(1, 4, 13) } +func init() { regKey(1, 4, 16) } func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) { var ( @@ -132,6 +134,21 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er return &st.Partitions[len(st.Partitions)-1] } + var includeBrokers bool + defer func() { + if includeBrokers { + for _, b := range c.bs { + sb := kmsg.NewFetchResponseBroker() + h, p, _ := net.SplitHostPort(b.ln.Addr().String()) + p32, _ := strconv.Atoi(p) + sb.NodeID = b.node + sb.Host = h + sb.Port = int32(p32) + resp.Brokers = append(resp.Brokers, sb) + } + } + }() + var batchesAdded int full: for _, rt := range req.Topics { @@ -146,7 +163,10 @@ full: continue } if pd.leader != creq.cc.b { - donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) + p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code) + p.CurrentLeader.LeaderID = pd.leader.node + p.CurrentLeader.LeaderEpoch = pd.epoch + includeBrokers = true continue } sp := donep(rt.Topic, rt.TopicID, rp.Partition, 0) diff --git a/pkg/kfake/23_offset_for_leader_epoch.go b/pkg/kfake/23_offset_for_leader_epoch.go index 6b6d768e..f531ecf8 100644 --- a/pkg/kfake/23_offset_for_leader_epoch.go +++ b/pkg/kfake/23_offset_for_leader_epoch.go @@ -74,11 +74,24 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg continue } + // If our epoch was bumped before anything was + // produced, return the epoch and a start offset of 0. + if len(pd.batches) == 0 { + sp.LeaderEpoch = pd.epoch + sp.EndOffset = 0 + if rp.LeaderEpoch > pd.epoch { + sp.LeaderEpoch = -1 + sp.EndOffset = -1 + } + continue + } + // What is the largest epoch after the requested epoch? + nextEpoch := rp.LeaderEpoch + 1 idx, _ := sort.Find(len(pd.batches), func(idx int) int { batchEpoch := pd.batches[idx].epoch switch { - case rp.LeaderEpoch <= batchEpoch: + case nextEpoch <= batchEpoch: return -1 default: return 1 @@ -92,19 +105,16 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg continue } - // Requested epoch is before the LSO: return the requested - // epoch and the LSO. - if idx == 0 && pd.batches[0].epoch > rp.LeaderEpoch { + // Next epoch is actually the first epoch: return the + // requested epoch and the LSO. + if idx == 0 { sp.LeaderEpoch = rp.LeaderEpoch sp.EndOffset = pd.logStartOffset continue } - // The requested epoch exists and is not the latest - // epoch, we return the end offset being the first - // offset of the next epoch. - sp.LeaderEpoch = pd.batches[idx].epoch - sp.EndOffset = pd.batches[idx+1].FirstOffset + sp.LeaderEpoch = pd.batches[idx-1].epoch + sp.EndOffset = pd.batches[idx].FirstOffset } } return resp, nil diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index e1a92969..275b1440 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -958,6 +958,7 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32 return } pd.leader = br + pd.epoch++ }) return err }