Skip to content

Commit

Permalink
kfake: Drop topic configuration validation
Browse files Browse the repository at this point in the history
This isn't used at the moment; it gets in the way of using kfake to stub out behaviour
that mocks custom cluster & topic config.
  • Loading branch information
jan-g committed Jun 13, 2024
1 parent 40589af commit a5ac198
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 68 deletions.
5 changes: 0 additions & 5 deletions pkg/kfake/19_create_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (c *Cluster) handleCreateTopics(b *broker, kreq kmsg.Request) (kmsg.Respons
uniq[rt.Topic] = struct{}{}
}

topics:
for _, rt := range req.Topics {
if _, ok := c.data.tps.gett(rt.Topic); ok {
donet(rt.Topic, kerr.TopicAlreadyExists.Code)
Expand All @@ -66,10 +65,6 @@ topics:
}
configs := make(map[string]*string)
for _, c := range rt.Configs {
if ok := validateSetTopicConfig(c.Name, c.Value); !ok {
donet(rt.Topic, kerr.InvalidConfig.Code)
continue topics
}
configs[c.Name] = c.Value
}
c.data.mkt(rt.Topic, int(rt.NumPartitions), int(rt.ReplicationFactor), configs)
Expand Down
62 changes: 0 additions & 62 deletions pkg/kfake/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math/rand"
"sort"
"strconv"
"strings"
"time"

"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -244,9 +243,6 @@ func (d *data) configs(t string, fn func(k string, v *string, src kmsg.ConfigSou

// Unlike Kafka, we validate the value before allowing it to be set.
func (c *Cluster) setBrokerConfig(k string, v *string, dry bool) bool {
if !validateSetBrokerConfig(k, v) {
return false
}
if dry {
return true
}
Expand All @@ -255,9 +251,6 @@ func (c *Cluster) setBrokerConfig(k string, v *string, dry bool) bool {
}

func (d *data) setTopicConfig(t string, k string, v *string, dry bool) bool {
if !validateSetTopicConfig(k, v) {
return false
}
if dry {
return true
}
Expand All @@ -268,61 +261,6 @@ func (d *data) setTopicConfig(t string, k string, v *string, dry bool) bool {
return true
}

func validateSetTopicConfig(k string, v *string) bool {
if _, ok := validTopicConfigs[k]; !ok {
return false
}
fn, ok := validateSetConfig[k]
if !ok {
return false
}
return fn(v)
}

func validateSetBrokerConfig(k string, v *string) bool {
if _, ok := validBrokerConfigs[k]; !ok {
return false
}
fn, ok := validateSetConfig[k]
if !ok {
return false
}
return fn(v)
}

// Validation functions for all configs we support setting. Keys not in this
// map are not settable.
var validateSetConfig = map[string]func(*string) bool{
"cleanup.policy": func(v *string) bool {
if v == nil {
return false
}
s := strings.Split(*v, ",")
for _, policy := range s {
if policy != "delete" && policy != "compact" {
return false
}
}
return true
},

"compression.type": staticConfig("uncompressed", "lz4", "zstd", "snappy", "gzip", "producer"),

"max.message.bytes": numberConfig(0, true, 0, false),
"message.timestamp.type": staticConfig("CreateTime", "LogAppendTime"),
"min.insync.replicas": numberConfig(1, true, 0, false),
"retention.bytes": numberConfig(-1, true, 0, false),
"retention.ms": numberConfig(-1, true, 0, false),

"default.replication.factor": numberConfig(1, true, 0, false),
"fetch.max.bytes": numberConfig(1024, true, 0, false),
"log.dir": func(v *string) bool { return v != nil },
"log.message.timestamp.type": staticConfig("CreateTime", "LogAppendTime"),
"log.retention.bytes": numberConfig(-1, true, 0, false),
"log.retention.ms": numberConfig(-1, true, 0, false),
"message.max.bytes": numberConfig(0, true, 0, false),
}

// All valid topic configs we support, as well as the equivalent broker
// config if there is one.
var validTopicConfigs = map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kfake/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (gs groupState) String() string {

func (c *Cluster) coordinator(id string) *broker {
gen := c.coordinatorGen.Load()
n := hashString(fmt.Sprint("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs))
n := hashString(fmt.Sprintf("%d", gen)+"\x00\x00"+id) % uint64(len(c.bs))
return c.bs[n]
}

Expand Down

0 comments on commit a5ac198

Please sign in to comment.