From 58456805b9a02065f8926b8f54792021bf12fec7 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Mon, 11 Nov 2024 20:43:23 +0100 Subject: [PATCH] Circuit breaker playground (#38) * Circuit breaker playground * Circuit breaker playground --- .../circuit-breaker-playground/README.md | 5 + _examples/circuit-breaker-playground/main.go | 149 ++++++++++++++++++ .../circuit-breaker-playground/middleware.go | 42 +++++ _examples/go.mod | 42 +++++ _examples/go.sum | 92 +++++++++++ curl/cmd.go | 12 +- fasthttp/job.go | 7 +- fasthttp/job_test.go | 4 +- loadgen/app.go | 4 + loadgen/flags.go | 5 + loadgen/run.go | 10 ++ nethttp/job.go | 27 +++- 12 files changed, 379 insertions(+), 20 deletions(-) create mode 100644 _examples/circuit-breaker-playground/README.md create mode 100644 _examples/circuit-breaker-playground/main.go create mode 100644 _examples/circuit-breaker-playground/middleware.go create mode 100644 _examples/go.mod create mode 100644 _examples/go.sum diff --git a/_examples/circuit-breaker-playground/README.md b/_examples/circuit-breaker-playground/README.md new file mode 100644 index 0000000..c0f8522 --- /dev/null +++ b/_examples/circuit-breaker-playground/README.md @@ -0,0 +1,5 @@ +# Circuit Breaker Playground + +This application simulates circuit breaker reaction on an unhealthy application. + +You can control server latency with Up/Down buttons in real time. \ No newline at end of file diff --git a/_examples/circuit-breaker-playground/main.go b/_examples/circuit-breaker-playground/main.go new file mode 100644 index 0000000..424b8b1 --- /dev/null +++ b/_examples/circuit-breaker-playground/main.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "fmt" + "math/rand/v2" + "net/http" + "net/http/httptest" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/alecthomas/kingpin/v2" + "github.com/gizak/termui/v3/widgets" + "github.com/sony/gobreaker" + "github.com/vearutop/plt/curl" + "github.com/vearutop/plt/loadgen" + "github.com/vearutop/plt/nethttp" +) + +func main() { + lf := loadgen.Flags{} + lf.Register() + + var ( + // Response time window, normally distributed. + minResp = int64(300 * time.Millisecond) + maxResp = int64(510 * time.Millisecond) + + // Atomic counters. + cbFailed int64 + cbPassed int64 + cbState int64 + + // Response time considered a timeout by HTTP client. + timeout = 500 * time.Millisecond + + // ReadyToTrip params. + requestsThreshold = uint32(100) + errorThreshold = 0.03 + + mu sync.Mutex + readyToTripMsg string + ) + + cbSettings := gobreaker.Settings{ + // Name is the name of the CircuitBreaker. + Name: "acme", + + // MaxRequests is the maximum number of requests allowed to pass through + // when the CircuitBreaker is half-open. + // If MaxRequests is 0, the CircuitBreaker allows only 1 request. + MaxRequests: 500, + + // Interval is the cyclic period of the closed state + // for the CircuitBreaker to clear the internal Counts. + // If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state. + Interval: 2000 * time.Millisecond, + + // Timeout is the period of the open state, + // after which the state of the CircuitBreaker becomes half-open. + // If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds. + Timeout: 3 * time.Second, + + // OnStateChange is called whenever the state of the CircuitBreaker changes. + OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) { + atomic.StoreInt64(&cbState, int64(to)) + }, + + // ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state. + // If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state. + // If ReadyToTrip is nil, default ReadyToTrip is used. + // Default ReadyToTrip returns true when the number of consecutive failures is more than 5. + ReadyToTrip: func(counts gobreaker.Counts) bool { + enoughRequests := counts.Requests > requestsThreshold + errorRate := float64(counts.TotalFailures) / float64(counts.Requests) + reachedFailureLvl := errorRate >= errorThreshold + + mu.Lock() + defer mu.Unlock() + + readyToTripMsg = fmt.Sprintf("%d/%d failed (%.2f%%), %s", + counts.TotalFailures, counts.Requests, 100*errorRate, time.Now().Format(time.TimeOnly)) + return enoughRequests && reachedFailureLvl + }, + } + + // This handler returns 200 OK after a random delay between minResp and maxResp. + h := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + r := rand.Float64() + t := float64(atomic.LoadInt64(&minResp)) + float64(atomic.LoadInt64(&maxResp)-atomic.LoadInt64(&minResp))*r + + time.Sleep(time.Duration(t)) + }) + + srv := httptest.NewServer(h) + + // Customizing Live UI. + lf.PrepareLoadLimitsWidget = func(paragraph *widgets.Paragraph) { + mu.Lock() + defer mu.Unlock() + + paragraph.Title = "Response Time" + paragraph.Text = fmt.Sprintf("Max resp: %s, /: ±5%%", time.Duration(atomic.LoadInt64(&maxResp)).Truncate(time.Millisecond).String()) + paragraph.Text += fmt.Sprintf("\nCB %s, f: %d, p: %d\nReady to trip: %s", + gobreaker.State(atomic.LoadInt64(&cbState)).String(), + atomic.LoadInt64(&cbFailed), + atomic.LoadInt64(&cbPassed), + readyToTripMsg, + ) + } + + lf.KeyPressed[""] = func() { + atomic.StoreInt64(&maxResp, int64(float64(atomic.LoadInt64(&maxResp))*1.05)) + } + + lf.KeyPressed[""] = func() { + atomic.StoreInt64(&maxResp, int64(float64(atomic.LoadInt64(&maxResp))*0.95)) + } + + // Applying transport middleware. + curl.AddCommand(&lf, func(lf *loadgen.Flags, f *nethttp.Flags, j loadgen.JobProducer) { + if nj, ok := j.(*nethttp.JobProducer); ok { + nj.PrepareRoundTripper = func(tr http.RoundTripper) http.RoundTripper { + return CircuitBreakerMiddleware(cbSettings, &cbFailed)( + roundTripperFunc(func(r *http.Request) (*http.Response, error) { + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + + atomic.AddInt64(&cbPassed, 1) + + return tr.RoundTrip(r.WithContext(ctx)) + }), + ) + } + } + }) + + // Preparing command line arguments. + os.Args = append(os.Args, + "--live-ui", + "--rate-limit=100", + "--number=1000000", + "curl", srv.URL) + + // Running the app.q + kingpin.Parse() +} diff --git a/_examples/circuit-breaker-playground/middleware.go b/_examples/circuit-breaker-playground/middleware.go new file mode 100644 index 0000000..d489112 --- /dev/null +++ b/_examples/circuit-breaker-playground/middleware.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "net/http" + "sync/atomic" + + "github.com/sony/gobreaker" +) + +// CircuitBreakerMiddleware is a http.RoundTripper. +func CircuitBreakerMiddleware( + settings gobreaker.Settings, + cbFailed *int64, +) func(tripper http.RoundTripper) http.RoundTripper { + cb := gobreaker.NewTwoStepCircuitBreaker(settings) + return func(tripper http.RoundTripper) http.RoundTripper { + return roundTripperFunc(func(r *http.Request) (*http.Response, error) { + done, err := cb.Allow() + // Error means circuit breaker is open and request should fail immediately. + if err != nil { + atomic.AddInt64(cbFailed, 1) + + return nil, fmt.Errorf("circuit breaker %s: %w", settings.Name, err) + } + resp, err := tripper.RoundTrip(r) + + // Done is nil if err is not. + if done != nil { + done(err == nil) + } + + return resp, err + }) + } +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} diff --git a/_examples/go.mod b/_examples/go.mod new file mode 100644 index 0000000..05aebfd --- /dev/null +++ b/_examples/go.mod @@ -0,0 +1,42 @@ +module examples + +go 1.23.1 + +replace github.com/vearutop/plt => ../ + +require ( + github.com/alecthomas/kingpin/v2 v2.4.0 + github.com/gizak/termui/v3 v3.1.0 + github.com/sony/gobreaker v1.0.0 + github.com/vearutop/plt v0.3.12 +) + +require ( + github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/bool64/dev v0.2.36 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/google/pprof v0.0.0-20240903155634-a8630aee4ab9 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect + github.com/nsf/termbox-go v1.1.1 // indirect + github.com/onsi/ginkgo/v2 v2.20.2 // indirect + github.com/quic-go/qpack v0.5.1 // indirect + github.com/quic-go/quic-go v0.47.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.55.0 // indirect + github.com/vearutop/dynhist-go v1.2.2 // indirect + github.com/xhit/go-str2duration/v2 v2.1.0 // indirect + go.uber.org/mock v0.4.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.6.0 // indirect + golang.org/x/tools v0.25.0 // indirect +) diff --git a/_examples/go.sum b/_examples/go.sum new file mode 100644 index 0000000..2d96497 --- /dev/null +++ b/_examples/go.sum @@ -0,0 +1,92 @@ +github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY= +github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/bool64/dev v0.2.36 h1:yU3bbOTujoxhWnt8ig8t94PVmZXIkCaRj9C57OtqJBY= +github.com/bool64/dev v0.2.36/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gizak/termui/v3 v3.1.0 h1:ZZmVDgwHl7gR7elfKf1xc4IudXZ5qqfDh4wExk4Iajc= +github.com/gizak/termui/v3 v3.1.0/go.mod h1:bXQEBkJpzxUAKf0+xq9MSWAvWZlE7c+aidmyFlkYTrY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20240903155634-a8630aee4ab9 h1:q5g0N9eal4bmJwXHC5z0QCKs8qhS35hFfq0BAYsIwZI= +github.com/google/pprof v0.0.0-20240903155634-a8630aee4ab9/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM= +github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= +github.com/nsf/termbox-go v0.0.0-20190121233118-02980233997d/go.mod h1:IuKpRQcYE1Tfu+oAQqaLisqDeXgjyyltCfsaoYN18NQ= +github.com/nsf/termbox-go v1.1.1 h1:nksUPLCb73Q++DwbYUBEglYBRPZyoXJdrj5L+TkjyZY= +github.com/nsf/termbox-go v1.1.1/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= +github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= +github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= +github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= +github.com/quic-go/quic-go v0.47.0 h1:yXs3v7r2bm1wmPTYNLKAAJTHMYkPEsfYJmTazXrCZ7Y= +github.com/quic-go/quic-go v0.47.0/go.mod h1:3bCapYsJvXGZcipOHuu7plYtaV6tnF+z7wIFsU0WK9E= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= +github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= +github.com/vearutop/dynhist-go v1.2.2 h1:pKPC2rYHdzpkasztNJmjE7m9DbUJDbMhZi2ZxaUvmS4= +github.com/vearutop/dynhist-go v1.2.2/go.mod h1:liiiYiwAi8ixC3DbkxooEhASTF6ysJSXy+piCrBtxEg= +github.com/vearutop/plt v0.3.12 h1:m1QdFB3zZyZ0rtk8vjVL+jwOCKmMuARVN/uXOEdX2zo= +github.com/vearutop/plt v0.3.12/go.mod h1:tzjjDW70+eb8Lyf+/0KMZ9P5NXZ4PpdI/Lo98mH/gxk= +github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= +github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= +golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/curl/cmd.go b/curl/cmd.go index dda7547..59905fb 100644 --- a/curl/cmd.go +++ b/curl/cmd.go @@ -204,23 +204,15 @@ func run(lf loadgen.Flags, f nethttp.Flags, options ...func(lf *loadgen.Flags, f err error ) - for _, o := range options { - o(&lf, &f, nil) - } - if f.Fast { - if j, err = fasthttp.NewJobProducer(f); err != nil { + if j, err = fasthttp.NewJobProducer(f, lf, options...); err != nil { return fmt.Errorf("failed to init job producer: %w", err) } } else { - if j, err = nethttp.NewJobProducer(f, lf); err != nil { + if j, err = nethttp.NewJobProducer(f, lf, options...); err != nil { return fmt.Errorf("failed to init job producer: %w", err) } } - for _, o := range options { - o(&lf, &f, j) - } - return loadgen.Run(lf, j) } diff --git a/fasthttp/job.go b/fasthttp/job.go index 3d81ebc..9ba6d72 100644 --- a/fasthttp/job.go +++ b/fasthttp/job.go @@ -12,6 +12,7 @@ import ( "time" "github.com/valyala/fasthttp" + "github.com/vearutop/plt/loadgen" "github.com/vearutop/plt/nethttp" "github.com/vearutop/plt/report" ) @@ -73,7 +74,7 @@ func (c countingConn) Write(b []byte) (n int, err error) { } // NewJobProducer creates load generator. -func NewJobProducer(f nethttp.Flags) (*JobProducer, error) { +func NewJobProducer(f nethttp.Flags, lf loadgen.Flags, options ...func(lf *loadgen.Flags, f *nethttp.Flags, j loadgen.JobProducer)) (*JobProducer, error) { u, err := url.Parse(f.URL) if err != nil { return nil, fmt.Errorf("failed to parse URL: %w", err) @@ -110,6 +111,10 @@ func NewJobProducer(f nethttp.Flags) (*JobProducer, error) { } j.client.MaxConnsPerHost = 10000 + for _, o := range options { + o(&lf, &f, &j) + } + if _, ok := f.HeaderMap["User-Agent"]; !ok { j.client.Name = "plt" } diff --git a/fasthttp/job_test.go b/fasthttp/job_test.go index 07e49f1..a8d8423 100644 --- a/fasthttp/job_test.go +++ b/fasthttp/job_test.go @@ -40,7 +40,7 @@ func TestNewJobProducer(t *testing.T) { Method: http.MethodPost, Compressed: true, } - j, err := fh.NewJobProducer(f) + j, err := fh.NewJobProducer(f, lf) require.NoError(t, err) j.PrepareRequest = func(_ int, req *fasthttp.Request) error { @@ -79,7 +79,7 @@ func BenchmarkJobProducer_Job(b *testing.B) { Method: http.MethodPost, Compressed: true, } - j, err := fh.NewJobProducer(f) + j, err := fh.NewJobProducer(f, lf) require.NoError(b, err) j.PrepareRequest = func(_ int, req *fasthttp.Request) error { diff --git a/loadgen/app.go b/loadgen/app.go index 0e73095..c3ce854 100644 --- a/loadgen/app.go +++ b/loadgen/app.go @@ -22,4 +22,8 @@ func (lf *Flags) Register() { kingpin.Flag("slow", "Min duration of slow response."). Default("1s").DurationVar(&lf.SlowResponse) kingpin.Flag("live-ui", "Show live ui with statistics.").BoolVar(&lf.LiveUI) + + if lf.KeyPressed == nil { + lf.KeyPressed = make(map[string]func()) + } } diff --git a/loadgen/flags.go b/loadgen/flags.go index 96fc511..9cc3039 100644 --- a/loadgen/flags.go +++ b/loadgen/flags.go @@ -3,6 +3,8 @@ package loadgen import ( "io" "time" + + "github.com/gizak/termui/v3/widgets" ) // Flags control load testing. @@ -24,6 +26,9 @@ type Flags struct { // Increment float64 // Percentage of request rate increment on each step, e.g. 5.5 for 5.5%. Output io.Writer + + KeyPressed map[string]func() + PrepareLoadLimitsWidget func(paragraph *widgets.Paragraph) } // Prepare sets conditional defaults. diff --git a/loadgen/run.go b/loadgen/run.go index 23434de..337c968 100644 --- a/loadgen/run.go +++ b/loadgen/run.go @@ -342,6 +342,12 @@ func (r *runner) decreaseRateLimit() { func (r *runner) startLiveUIPoller() { uiEvents := ui.PollEvents() for e := range uiEvents { + if f := r.lf.KeyPressed[e.ID]; f != nil { + f() + + continue + } + switch e.ID { case "q", "": r.exit <- os.Interrupt @@ -442,6 +448,10 @@ func (r *runner) runLiveUI() { loadLimits.SetRect(60, 0, 100, 7) + if r.lf.PrepareLoadLimitsWidget != nil { + r.lf.PrepareLoadLimitsWidget(loadLimits) + } + drawables = append(drawables, requestCounters, loadLimits) rpsPlot.DataLabels = make([]string, 0, len(counts)) diff --git a/nethttp/job.go b/nethttp/job.go index 463a182..237054f 100644 --- a/nethttp/job.go +++ b/nethttp/job.go @@ -26,7 +26,8 @@ import ( // JobProducer sends HTTP requests. type JobProducer struct { - PrepareRequest func(i int, req *http.Request) error + PrepareRequest func(i int, req *http.Request) error + PrepareRoundTripper func(tr http.RoundTripper) http.RoundTripper bytesWritten int64 writeTime int64 @@ -95,14 +96,22 @@ func (c countingConn) Write(b []byte) (n int, err error) { } func (j *JobProducer) makeTransport() http.RoundTripper { + var tr http.RoundTripper + switch { case j.f.HTTP2: - return j.makeTransport2() + tr = j.makeTransport2() case j.f.HTTP3: - return j.makeTransport3() + tr = j.makeTransport3() default: - return j.makeTransport1() + tr = j.makeTransport1() + } + + if j.PrepareRoundTripper != nil { + tr = j.PrepareRoundTripper(tr) } + + return tr } func (j *JobProducer) makeTransport1() *http.Transport { @@ -164,7 +173,7 @@ func (j *JobProducer) makeTransport2() *http2.Transport { } // NewJobProducer creates HTTP load generator. -func NewJobProducer(f Flags, lf loadgen.Flags) (*JobProducer, error) { +func NewJobProducer(f Flags, lf loadgen.Flags, options ...func(lf *loadgen.Flags, f *Flags, j loadgen.JobProducer)) (*JobProducer, error) { u, err := url.Parse(f.URL) if err != nil { return nil, fmt.Errorf("failed to parse URL: %w", err) @@ -181,8 +190,6 @@ func NewJobProducer(f Flags, lf loadgen.Flags) (*JobProducer, error) { j.log += fmt.Sprintln("Host resolved:", strings.Join(addrs, ",")) - j.tr = j.makeTransport() - j.dnsHist = &dynhist.Collector{BucketsLimit: 10, WeightFunc: dynhist.LatencyWidth} j.connHist = &dynhist.Collector{BucketsLimit: 10, WeightFunc: dynhist.LatencyWidth} j.tlsHist = &dynhist.Collector{BucketsLimit: 10, WeightFunc: dynhist.LatencyWidth} @@ -193,6 +200,12 @@ func NewJobProducer(f Flags, lf loadgen.Flags) (*JobProducer, error) { j.respHeader = make(map[int]http.Header, 5) j.respProto = make(map[int]string, 5) + for _, o := range options { + o(&lf, &f, &j) + } + + j.tr = j.makeTransport() + if _, ok := f.HeaderMap["User-Agent"]; !ok { f.HeaderMap["User-Agent"] = "plt" }