diff --git a/_examples/go.mod b/_examples/go.mod index 56f7a43c..06dca683 100644 --- a/_examples/go.mod +++ b/_examples/go.mod @@ -17,12 +17,14 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/sessions v1.3.0 github.com/gorilla/websocket v1.5.0 + github.com/kelseyhightower/envconfig v1.4.0 github.com/mailru/easygo v0.0.0-20190618140210-3c14a0dc985f github.com/nats-io/nats.go v1.36.0 github.com/prometheus/client_golang v1.20.5 github.com/quic-go/quic-go v0.42.0 github.com/stretchr/testify v1.10.0 github.com/vmihailenco/msgpack/v5 v5.3.1 + go.uber.org/ratelimit v0.3.1 golang.org/x/oauth2 v0.22.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.35.1 @@ -32,6 +34,7 @@ require ( require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect github.com/FZambia/eagle v0.1.0 // indirect + github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect diff --git a/_examples/go.sum b/_examples/go.sum index be323b07..26c57016 100644 --- a/_examples/go.sum +++ b/_examples/go.sum @@ -4,6 +4,8 @@ github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA= github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA= github.com/FZambia/tarantool v0.2.2 h1:uC4clbBxkpvILYcHj4dktyYwUs57BeODbY/yWgH67pU= github.com/FZambia/tarantool v0.2.2/go.mod h1:MSuWem4S/t7G+qxg8PZk8Mn25UfoXLYf+UxYFIfEydM= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boj/redistore v0.0.0-20180917114910-cd5dcc76aeff/go.mod h1:+RTT1BOk5P97fT2CiHkbFQwkK3mjsFAP6zCYV2aXtjw= @@ -93,6 +95,8 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kidstuff/mongostore v0.0.0-20181113001930-e650cd85ee4b/go.mod h1:g2nVr8KZVXJSS97Jo8pJ0jgq29P6H7dG0oplUA86MQw= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= @@ -193,8 +197,12 @@ github.com/vmihailenco/msgpack/v5 v5.3.1 h1:0i85a4dsZh8mC//wmyyTEzidDLPQfQAxZIOL github.com/vmihailenco/msgpack/v5 v5.3.1/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= diff --git a/_examples/redis_benchmark/main.go b/_examples/redis_benchmark/main.go new file mode 100644 index 00000000..dd4cfdba --- /dev/null +++ b/_examples/redis_benchmark/main.go @@ -0,0 +1,270 @@ +package main + +import ( + "context" + "log" + "math/rand" + "net/http" + "net/url" + "os" + "os/signal" + "strconv" + "sync/atomic" + "syscall" + "time" + + _ "net/http/pprof" + + "github.com/centrifugal/centrifuge" + "github.com/kelseyhightower/envconfig" + "go.uber.org/ratelimit" +) + +type Config struct { + Port int `envconfig:"PORT" default:"8000"` + + RedisAddress []string `envconfig:"REDIS_ADDRESS" default:"127.0.0.1:6379"` + + HistorySize int `envconfig:"HISTORY_SIZE" default:"0"` + HistoryTTL time.Duration `envconfig:"HISTORY_TTL" default:"60s"` + + NumDifferentChannels int `envconfig:"NUM_DIFFERENT_CHANNELS" default:"1024"` + + PublishRateLimit int `envconfig:"PUBLISH_RATE" default:"50000"` + SubscribeRateLimit int `envconfig:"SUBSCRIBE_RATE" default:"50000"` + UnsubscribeRateLimit int `envconfig:"UNSUBSCRIBE_RATE" default:"50000"` + HistoryRateLimit int `envconfig:"HISTORY_RATE" default:"0"` + + MessageSize int `envconfig:"MESSAGE_SIZE" default:"128"` +} + +func handleLog(e centrifuge.LogEntry) { + log.Printf("[centrifuge] %s: %v", e.Message, e.Fields) +} + +func authMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx = centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ + UserID: "42", + Info: []byte(`{"name": "Alexander"}`), + }) + r = r.WithContext(ctx) + h.ServeHTTP(w, r) + }) +} + +func waitExitSignal(n *centrifuge.Node) { + sigCh := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + _ = n.Shutdown(context.Background()) + done <- true + }() + <-done +} + +func main() { + var cfg Config + err := envconfig.Process("", &cfg) + if err != nil { + log.Fatal(err) + } + + log.Printf("NUM_DIFFERENT_CHANNELS=%d, MESSAGE_SIZE=%d, HISTORY_SIZE=%d, HISTORY_TTL=%s, "+ + "PUBLISH_RATE=%d, SUBSCRIBE_RATE=%d, UNSUBSCRIBE_RATE=%d, HISTORY_RATE=%d", cfg.NumDifferentChannels, + cfg.MessageSize, cfg.HistorySize, cfg.HistoryTTL, cfg.PublishRateLimit, cfg.SubscribeRateLimit, + cfg.UnsubscribeRateLimit, cfg.HistoryRateLimit) + + node, _ := centrifuge.New(centrifuge.Config{ + LogLevel: centrifuge.LogLevelError, + LogHandler: handleLog, + }) + + node.OnConnect(func(client *centrifuge.Client) { + client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { + cb(centrifuge.SubscribeReply{}, nil) + }) + + client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { + cb(centrifuge.PublishReply{}, nil) + }) + }) + + var redisShardConfigs []centrifuge.RedisShardConfig + + for _, addr := range cfg.RedisAddress { + if u, err := url.Parse(addr); err != nil { + log.Printf("connecting to Redis: %s", addr) + } else { + log.Printf("connecting to Redis: %s", u.Redacted()) + } + redisShardConfigs = append(redisShardConfigs, centrifuge.RedisShardConfig{ + Address: addr, + }) + } + + var redisShards []*centrifuge.RedisShard + for _, redisConf := range redisShardConfigs { + redisShard, err := centrifuge.NewRedisShard(node, redisConf) + if err != nil { + log.Fatal(err) + } + redisShards = append(redisShards, redisShard) + } + + broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{ + // And configure a couple of shards to use. + Shards: redisShards, + }) + if err != nil { + log.Fatal(err) + } + node.SetBroker(broker) + + presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{ + Shards: redisShards, + }) + if err != nil { + log.Fatal(err) + } + node.SetPresenceManager(presenceManager) + + if err := node.Run(); err != nil { + log.Fatal(err) + } + + var publishNum int64 + var subNum int64 + var unsubNum int64 + var historyNum int64 + + if cfg.PublishRateLimit > 0 { + publishRateLimiter := ratelimit.New(cfg.PublishRateLimit, ratelimit.Per(time.Second)) + var publishOptions []centrifuge.PublishOption + if cfg.HistorySize > 0 { + publishOptions = append(publishOptions, centrifuge.WithHistory(cfg.HistorySize, cfg.HistoryTTL)) + } + + for i := 0; i < cfg.NumDifferentChannels; i++ { + i := i + go func() { + msg := randString(cfg.MessageSize) + for { + publishRateLimiter.Take() + _, err := node.Publish( + "channel"+strconv.Itoa(i%cfg.NumDifferentChannels), + []byte(`{"d": "`+msg+`"}`), + publishOptions..., + ) + if err != nil { + log.Printf("error publishing to channel: %s", err) + } + atomic.AddInt64(&publishNum, 1) + } + }() + } + } + + if cfg.SubscribeRateLimit > 0 { + subRateLimiter := ratelimit.New(cfg.SubscribeRateLimit, ratelimit.Per(time.Second)) + for i := 0; i < cfg.NumDifferentChannels; i++ { + i := i + go func() { + for { + subRateLimiter.Take() + err := broker.Subscribe("channel" + strconv.Itoa(i%cfg.NumDifferentChannels)) + if err != nil { + log.Printf("error subscribing to channel: %s", err) + } + atomic.AddInt64(&subNum, 1) + } + }() + } + } + + if cfg.UnsubscribeRateLimit > 0 { + unsubRateLimiter := ratelimit.New(cfg.UnsubscribeRateLimit, ratelimit.Per(time.Second)) + for i := 0; i < cfg.NumDifferentChannels; i++ { + i := i + go func() { + for { + unsubRateLimiter.Take() + err := broker.Unsubscribe("channel" + strconv.Itoa(i%cfg.NumDifferentChannels)) + if err != nil { + log.Printf("error unsubscribing from channel: %s", err) + } + atomic.AddInt64(&unsubNum, 1) + } + }() + } + } + + if cfg.HistoryRateLimit > 0 { + historyRateLimiter := ratelimit.New(cfg.HistoryRateLimit, ratelimit.Per(time.Second)) + for i := 0; i < cfg.NumDifferentChannels; i++ { + i := i + go func() { + for { + historyRateLimiter.Take() + // No publications loaded here now, only stream position. + _, err := node.History("channel" + strconv.Itoa(i%cfg.NumDifferentChannels)) + if err != nil { + log.Printf("error getting history from channel: %s", err) + } + atomic.AddInt64(&historyNum, 1) + } + }() + } + } + + go func() { + time.Sleep(time.Second) + prevPublished := int64(0) + prevSubscribes := int64(0) + prevUnsubscribes := int64(0) + prevHistory := int64(0) + for { + currentPublished := atomic.LoadInt64(&publishNum) + currentSubscribes := atomic.LoadInt64(&subNum) + currentUnsubscribes := atomic.LoadInt64(&unsubNum) + currentHistory := atomic.LoadInt64(&historyNum) + log.Printf("Stats per second: published %d, subscribed: %d, unsubscribed: %d, history: %d", + currentPublished-prevPublished, + currentSubscribes-prevSubscribes, + currentUnsubscribes-prevUnsubscribes, + currentHistory-prevHistory, + ) + prevPublished = currentPublished + prevSubscribes = currentSubscribes + prevUnsubscribes = currentUnsubscribes + prevHistory = currentHistory + time.Sleep(time.Second) + } + }() + + http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) + http.Handle("/", http.FileServer(http.Dir("./"))) + + go func() { + if err := http.ListenAndServe(":"+strconv.Itoa(cfg.Port), nil); err != nil { + log.Fatal(err) + } + }() + + waitExitSignal(node) + log.Println("bye!") +} + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randString(n int) string { + random := rand.New(rand.NewSource(time.Now().UnixNano())) + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[random.Intn(len(letterRunes))] + } + return string(b) +} diff --git a/_examples/redis_benchmark/readme.md b/_examples/redis_benchmark/readme.md new file mode 100644 index 00000000..2eecf017 --- /dev/null +++ b/_examples/redis_benchmark/readme.md @@ -0,0 +1,34 @@ +Tool to put Centrifuge specific load on Redis setup. + +Once you run it you can estimate the load on Redis setup, Redis resource usage. + +Example usage: + +```bash +go build +REDIS_ADDRESS="redis://test:test@127.0.0.1:6379" ./redis-benchmark +``` + +Or with Redis Cluster: + +```bash +REDIS_ADDRESS="redis+cluster://test:test@127.0.0.1:7000" ./redis-benchmark +``` + +Or with sharding over 2 isolated Redis instances: + +```bash +REDIS_ADDRESS="redis://test:test@127.0.0.1:6379,redis://test:test@127.0.0.1:6380" ./redis-benchmark +``` + +With different bench parameters: + +```bash +PUBLISH_RATE=200 SUBSCRIBE_RATE=100 UNSUBSCRIBE_RATE=100 REDIS_ADDRESS="redis://test:test@127.0.0.1:6379" ./redis-benchmark +``` + +Or with history streams enabled and history requests: + +```bash +HISTORY_SIZE=100 HISTORY_TTL=60s HISTORY_RATE=10000 REDIS_ADDRESS="redis://test:test@127.0.0.1:6379" ./redis-benchmark +```