Skip to content

Commit

Permalink
kfake: support KIP-951, fix OffsetForLeaderEpoch
Browse files Browse the repository at this point in the history
OffsetForLeaderEpoch was not actually correct; now, by searching for the
next epoch, the logic is simpler.
  • Loading branch information
twmb committed May 9, 2024
1 parent 1ed02eb commit b05c3b9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
21 changes: 19 additions & 2 deletions pkg/kfake/00_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kfake

import (
"hash/crc32"
"net"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kerr"
Expand All @@ -14,7 +16,7 @@ import (
// * Multiple batches in one produce
// * Compact

func init() { regKey(0, 3, 9) }
func init() { regKey(0, 3, 10) }

func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
var (
Expand Down Expand Up @@ -46,13 +48,25 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er
donet(t, errCode)
}
}
var includeBrokers bool
toresp := func() kmsg.Response {
for topic, partitions := range tdone {
st := kmsg.NewProduceResponseTopic()
st.Topic = topic
st.Partitions = partitions
resp.Topics = append(resp.Topics, st)
}
if includeBrokers {
for _, b := range c.bs {
sb := kmsg.NewProduceResponseBroker()
h, p, _ := net.SplitHostPort(b.ln.Addr().String())
p32, _ := strconv.Atoi(p)
sb.NodeID = b.node
sb.Host = h
sb.Port = int32(p32)
resp.Brokers = append(resp.Brokers, sb)
}
}
return resp
}

Expand All @@ -76,7 +90,10 @@ func (c *Cluster) handleProduce(b *broker, kreq kmsg.Request) (kmsg.Response, er
continue
}
if pd.leader != b {
donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code)
p := donep(rt.Topic, rp, kerr.NotLeaderForPartition.Code)
p.CurrentLeader.LeaderID = pd.leader.node
p.CurrentLeader.LeaderEpoch = pd.epoch
includeBrokers = true
continue
}

Expand Down
24 changes: 22 additions & 2 deletions pkg/kfake/01_fetch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kfake

import (
"net"
"strconv"
"sync"
"time"

Expand All @@ -16,7 +18,7 @@ import (
// * Out of range fetch causes early return
// * Raw bytes of batch counts against wait bytes

func init() { regKey(1, 4, 13) }
func init() { regKey(1, 4, 16) }

func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, error) {
var (
Expand Down Expand Up @@ -132,6 +134,21 @@ func (c *Cluster) handleFetch(creq *clientReq, w *watchFetch) (kmsg.Response, er
return &st.Partitions[len(st.Partitions)-1]
}

var includeBrokers bool
defer func() {
if includeBrokers {
for _, b := range c.bs {
sb := kmsg.NewFetchResponseBroker()
h, p, _ := net.SplitHostPort(b.ln.Addr().String())
p32, _ := strconv.Atoi(p)
sb.NodeID = b.node
sb.Host = h
sb.Port = int32(p32)
resp.Brokers = append(resp.Brokers, sb)
}
}
}()

var batchesAdded int
full:
for _, rt := range req.Topics {
Expand All @@ -146,7 +163,10 @@ full:
continue
}
if pd.leader != creq.cc.b {
donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code)
p := donep(rt.Topic, rt.TopicID, rp.Partition, kerr.NotLeaderForPartition.Code)
p.CurrentLeader.LeaderID = pd.leader.node
p.CurrentLeader.LeaderEpoch = pd.epoch
includeBrokers = true
continue
}
sp := donep(rt.Topic, rt.TopicID, rp.Partition, 0)
Expand Down
28 changes: 19 additions & 9 deletions pkg/kfake/23_offset_for_leader_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,24 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg
continue
}

// If our epoch was bumped before anything was
// produced, return the epoch and a start offset of 0.
if len(pd.batches) == 0 {
sp.LeaderEpoch = pd.epoch
sp.EndOffset = 0
if rp.LeaderEpoch > pd.epoch {
sp.LeaderEpoch = -1
sp.EndOffset = -1
}
continue
}

// What is the largest epoch after the requested epoch?
nextEpoch := rp.LeaderEpoch + 1
idx, _ := sort.Find(len(pd.batches), func(idx int) int {
batchEpoch := pd.batches[idx].epoch
switch {
case rp.LeaderEpoch <= batchEpoch:
case nextEpoch <= batchEpoch:
return -1
default:
return 1
Expand All @@ -92,19 +105,16 @@ func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg
continue
}

// Requested epoch is before the LSO: return the requested
// epoch and the LSO.
if idx == 0 && pd.batches[0].epoch > rp.LeaderEpoch {
// Next epoch is actually the first epoch: return the
// requested epoch and the LSO.
if idx == 0 {
sp.LeaderEpoch = rp.LeaderEpoch
sp.EndOffset = pd.logStartOffset
continue
}

// The requested epoch exists and is not the latest
// epoch, we return the end offset being the first
// offset of the next epoch.
sp.LeaderEpoch = pd.batches[idx].epoch
sp.EndOffset = pd.batches[idx+1].FirstOffset
sp.LeaderEpoch = pd.batches[idx-1].epoch
sp.EndOffset = pd.batches[idx].FirstOffset
}
}
return resp, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32
return
}
pd.leader = br
pd.epoch++
})
return err
}
Expand Down

0 comments on commit b05c3b9

Please sign in to comment.