Skip to content

Commit

Permalink
Redis benchmark helper tool (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Dec 14, 2024
1 parent e83dc78 commit ebebb82
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 0 deletions.
3 changes: 3 additions & 0 deletions _examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/sessions v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/mailru/easygo v0.0.0-20190618140210-3c14a0dc985f
github.com/nats-io/nats.go v1.36.0
github.com/prometheus/client_golang v1.20.5
github.com/quic-go/quic-go v0.42.0
github.com/stretchr/testify v1.10.0
github.com/vmihailenco/msgpack/v5 v5.3.1
go.uber.org/ratelimit v0.3.1
golang.org/x/oauth2 v0.22.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.35.1
Expand All @@ -32,6 +34,7 @@ require (
require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
github.com/FZambia/eagle v0.1.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA=
github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA=
github.com/FZambia/tarantool v0.2.2 h1:uC4clbBxkpvILYcHj4dktyYwUs57BeODbY/yWgH67pU=
github.com/FZambia/tarantool v0.2.2/go.mod h1:MSuWem4S/t7G+qxg8PZk8Mn25UfoXLYf+UxYFIfEydM=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff/go.mod h1:+RTT1BOk5P97fT2CiHkbFQwkK3mjsFAP6zCYV2aXtjw=
Expand Down Expand Up @@ -93,6 +95,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kidstuff/mongostore v0.0.0-20181113001930-e650cd85ee4b/go.mod h1:g2nVr8KZVXJSS97Jo8pJ0jgq29P6H7dG0oplUA86MQw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
Expand Down Expand Up @@ -193,8 +197,12 @@ github.com/vmihailenco/msgpack/v5 v5.3.1 h1:0i85a4dsZh8mC//wmyyTEzidDLPQfQAxZIOL
github.com/vmihailenco/msgpack/v5 v5.3.1/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
Expand Down
270 changes: 270 additions & 0 deletions _examples/redis_benchmark/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package main

import (
"context"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"sync/atomic"
"syscall"
"time"

_ "net/http/pprof"

"github.com/centrifugal/centrifuge"
"github.com/kelseyhightower/envconfig"
"go.uber.org/ratelimit"
)

type Config struct {
Port int `envconfig:"PORT" default:"8000"`

RedisAddress []string `envconfig:"REDIS_ADDRESS" default:"127.0.0.1:6379"`

HistorySize int `envconfig:"HISTORY_SIZE" default:"0"`
HistoryTTL time.Duration `envconfig:"HISTORY_TTL" default:"60s"`

NumDifferentChannels int `envconfig:"NUM_DIFFERENT_CHANNELS" default:"1024"`

PublishRateLimit int `envconfig:"PUBLISH_RATE" default:"50000"`
SubscribeRateLimit int `envconfig:"SUBSCRIBE_RATE" default:"50000"`
UnsubscribeRateLimit int `envconfig:"UNSUBSCRIBE_RATE" default:"50000"`
HistoryRateLimit int `envconfig:"HISTORY_RATE" default:"0"`

MessageSize int `envconfig:"MESSAGE_SIZE" default:"128"`
}

func handleLog(e centrifuge.LogEntry) {
log.Printf("[centrifuge] %s: %v", e.Message, e.Fields)
}

func authMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
UserID: "42",
Info: []byte(`{"name": "Alexander"}`),
})
r = r.WithContext(ctx)
h.ServeHTTP(w, r)
})
}

func waitExitSignal(n *centrifuge.Node) {
sigCh := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
_ = n.Shutdown(context.Background())
done <- true
}()
<-done
}

func main() {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
log.Fatal(err)
}

log.Printf("NUM_DIFFERENT_CHANNELS=%d, MESSAGE_SIZE=%d, HISTORY_SIZE=%d, HISTORY_TTL=%s, "+
"PUBLISH_RATE=%d, SUBSCRIBE_RATE=%d, UNSUBSCRIBE_RATE=%d, HISTORY_RATE=%d", cfg.NumDifferentChannels,
cfg.MessageSize, cfg.HistorySize, cfg.HistoryTTL, cfg.PublishRateLimit, cfg.SubscribeRateLimit,
cfg.UnsubscribeRateLimit, cfg.HistoryRateLimit)

node, _ := centrifuge.New(centrifuge.Config{
LogLevel: centrifuge.LogLevelError,
LogHandler: handleLog,
})

node.OnConnect(func(client *centrifuge.Client) {
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
cb(centrifuge.SubscribeReply{}, nil)
})

client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
cb(centrifuge.PublishReply{}, nil)
})
})

var redisShardConfigs []centrifuge.RedisShardConfig

for _, addr := range cfg.RedisAddress {
if u, err := url.Parse(addr); err != nil {
log.Printf("connecting to Redis: %s", addr)
} else {
log.Printf("connecting to Redis: %s", u.Redacted())
}
redisShardConfigs = append(redisShardConfigs, centrifuge.RedisShardConfig{
Address: addr,
})
}

var redisShards []*centrifuge.RedisShard
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
log.Fatal(err)
}
redisShards = append(redisShards, redisShard)
}

broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
// And configure a couple of shards to use.
Shards: redisShards,
})
if err != nil {
log.Fatal(err)
}
node.SetBroker(broker)

presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Shards: redisShards,
})
if err != nil {
log.Fatal(err)
}
node.SetPresenceManager(presenceManager)

if err := node.Run(); err != nil {
log.Fatal(err)
}

var publishNum int64
var subNum int64
var unsubNum int64
var historyNum int64

if cfg.PublishRateLimit > 0 {
publishRateLimiter := ratelimit.New(cfg.PublishRateLimit, ratelimit.Per(time.Second))
var publishOptions []centrifuge.PublishOption
if cfg.HistorySize > 0 {
publishOptions = append(publishOptions, centrifuge.WithHistory(cfg.HistorySize, cfg.HistoryTTL))
}

for i := 0; i < cfg.NumDifferentChannels; i++ {
i := i
go func() {
msg := randString(cfg.MessageSize)
for {
publishRateLimiter.Take()
_, err := node.Publish(
"channel"+strconv.Itoa(i%cfg.NumDifferentChannels),
[]byte(`{"d": "`+msg+`"}`),
publishOptions...,
)
if err != nil {
log.Printf("error publishing to channel: %s", err)
}
atomic.AddInt64(&publishNum, 1)
}
}()
}
}

if cfg.SubscribeRateLimit > 0 {
subRateLimiter := ratelimit.New(cfg.SubscribeRateLimit, ratelimit.Per(time.Second))
for i := 0; i < cfg.NumDifferentChannels; i++ {
i := i
go func() {
for {
subRateLimiter.Take()
err := broker.Subscribe("channel" + strconv.Itoa(i%cfg.NumDifferentChannels))
if err != nil {
log.Printf("error subscribing to channel: %s", err)
}
atomic.AddInt64(&subNum, 1)
}
}()
}
}

if cfg.UnsubscribeRateLimit > 0 {
unsubRateLimiter := ratelimit.New(cfg.UnsubscribeRateLimit, ratelimit.Per(time.Second))
for i := 0; i < cfg.NumDifferentChannels; i++ {
i := i
go func() {
for {
unsubRateLimiter.Take()
err := broker.Unsubscribe("channel" + strconv.Itoa(i%cfg.NumDifferentChannels))
if err != nil {
log.Printf("error unsubscribing from channel: %s", err)
}
atomic.AddInt64(&unsubNum, 1)
}
}()
}
}

if cfg.HistoryRateLimit > 0 {
historyRateLimiter := ratelimit.New(cfg.HistoryRateLimit, ratelimit.Per(time.Second))
for i := 0; i < cfg.NumDifferentChannels; i++ {
i := i
go func() {
for {
historyRateLimiter.Take()
// No publications loaded here now, only stream position.
_, err := node.History("channel" + strconv.Itoa(i%cfg.NumDifferentChannels))
if err != nil {
log.Printf("error getting history from channel: %s", err)
}
atomic.AddInt64(&historyNum, 1)
}
}()
}
}

go func() {
time.Sleep(time.Second)
prevPublished := int64(0)
prevSubscribes := int64(0)
prevUnsubscribes := int64(0)
prevHistory := int64(0)
for {
currentPublished := atomic.LoadInt64(&publishNum)
currentSubscribes := atomic.LoadInt64(&subNum)
currentUnsubscribes := atomic.LoadInt64(&unsubNum)
currentHistory := atomic.LoadInt64(&historyNum)
log.Printf("Stats per second: published %d, subscribed: %d, unsubscribed: %d, history: %d",
currentPublished-prevPublished,
currentSubscribes-prevSubscribes,
currentUnsubscribes-prevUnsubscribes,
currentHistory-prevHistory,
)
prevPublished = currentPublished
prevSubscribes = currentSubscribes
prevUnsubscribes = currentUnsubscribes
prevHistory = currentHistory
time.Sleep(time.Second)
}
}()

http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
http.Handle("/", http.FileServer(http.Dir("./")))

go func() {
if err := http.ListenAndServe(":"+strconv.Itoa(cfg.Port), nil); err != nil {
log.Fatal(err)
}
}()

waitExitSignal(node)
log.Println("bye!")
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randString(n int) string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[random.Intn(len(letterRunes))]
}
return string(b)
}
34 changes: 34 additions & 0 deletions _examples/redis_benchmark/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Tool to put Centrifuge specific load on Redis setup.

Once you run it you can estimate the load on Redis setup, Redis resource usage.

Example usage:

```bash
go build
REDIS_ADDRESS="redis://test:[email protected]:6379" ./redis-benchmark
```

Or with Redis Cluster:

```bash
REDIS_ADDRESS="redis+cluster://test:[email protected]:7000" ./redis-benchmark
```

Or with sharding over 2 isolated Redis instances:

```bash
REDIS_ADDRESS="redis://test:[email protected]:6379,redis://test:[email protected]:6380" ./redis-benchmark
```

With different bench parameters:

```bash
PUBLISH_RATE=200 SUBSCRIBE_RATE=100 UNSUBSCRIBE_RATE=100 REDIS_ADDRESS="redis://test:[email protected]:6379" ./redis-benchmark
```

Or with history streams enabled and history requests:

```bash
HISTORY_SIZE=100 HISTORY_TTL=60s HISTORY_RATE=10000 REDIS_ADDRESS="redis://test:[email protected]:6379" ./redis-benchmark
```

0 comments on commit ebebb82

Please sign in to comment.