diff --git a/cmd/lotus-bench/cli.go b/cmd/lotus-bench/cli.go new file mode 100644 index 00000000000..0eaeb6ccba2 --- /dev/null +++ b/cmd/lotus-bench/cli.go @@ -0,0 +1,312 @@ +package main + +import ( + "errors" + "fmt" + "io" + "os" + "os/exec" + "os/signal" + "strconv" + "strings" + "sync" + "time" + + "github.com/urfave/cli/v2" +) + +var cliCmd = &cli.Command{ + Name: "cli", + Usage: "Runs a concurrent stress test on one or more binaries commands and prints the performance metrics including latency distribution and histogram", + Description: `This benchmark has the following features: +* Can query each command both sequentially and concurrently +* Supports rate limiting +* Can query multiple different commands at once (supporting different concurrency level and rate limiting for each command) +* Gives a nice reporting summary of the stress testing of each command (including latency distribution, histogram and more) +* Easy to use + +To use this benchmark you must specify the commands you want to test using the --cmd options, the format of it is: + + --cmd=CMD[:CONCURRENCY][:QPS] where only NAME is required. + +Here are some real examples: + lotus-bench cli --cmd='lotus-shed mpool miner-select-messages' // runs the command with default concurrency and qps + lotus-bench cli --cmd='lotus-shed mpool miner-select-messages:3' // override concurrency to 3 + lotus-bench cli --cmd='lotus-shed mpool miner-select-messages::100' // override to 100 qps while using default concurrency + lotus-bench cli --cmd='lotus-shed mpool miner-select-messages:3:100' // run using 3 workers but limit to 100 qps + lotus-bench cli --cmd='lotus-shed mpool miner-select-messages' --cmd='lotus sync wait' // run two commands at once +`, + Flags: []cli.Flag{ + &cli.DurationFlag{ + Name: "duration", + Value: 60 * time.Second, + Usage: "Duration of benchmark in seconds", + }, + &cli.IntFlag{ + Name: "concurrency", + Value: 10, + Usage: "How many workers should be used per command (can be overridden per command)", + }, + &cli.IntFlag{ + Name: "qps", + Value: 0, + Usage: "How many requests per second should be sent per command (can be overridden per command), a value of 0 means no limit", + }, + &cli.StringSliceFlag{ + Name: "cmd", + Usage: `Command to benchmark, you can specify multiple commands by repeating this flag. You can also specify command specific options to set the concurrency and qps for each command (see usage).`, + }, + &cli.DurationFlag{ + Name: "watch", + Value: 0 * time.Second, + Usage: "If >0 then generates reports every N seconds (only supports linux/unix)", + }, + &cli.BoolFlag{ + Name: "print-response", + Value: false, + Usage: "print the response of each request", + }, + }, + Action: func(cctx *cli.Context) error { + if len(cctx.StringSlice("cmd")) == 0 { + return errors.New("you must specify and least one cmd to benchmark") + } + + var cmds []*CMD + for _, str := range cctx.StringSlice("cmd") { + entries := strings.SplitN(str, ":", 3) + if len(entries) == 0 { + return errors.New("invalid cmd format") + } + + // check if concurrency was specified + concurrency := cctx.Int("concurrency") + if len(entries) > 1 { + if len(entries[1]) > 0 { + var err error + concurrency, err = strconv.Atoi(entries[1]) + if err != nil { + return fmt.Errorf("could not parse concurrency value from command %s: %v", entries[0], err) + } + } + } + + // check if qps was specified + qps := cctx.Int("qps") + if len(entries) > 2 { + if len(entries[2]) > 0 { + var err error + qps, err = strconv.Atoi(entries[2]) + if err != nil { + return fmt.Errorf("could not parse qps value from command %s: %v", entries[0], err) + } + } + } + + cmds = append(cmds, &CMD{ + w: os.Stdout, + cmd: entries[0], + concurrency: concurrency, + qps: qps, + printResp: cctx.Bool("print-response"), + }) + } + + // terminate early on ctrl+c + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + fmt.Println("Received interrupt, stopping...") + for _, cmd := range cmds { + cmd.Stop() + } + }() + + // stop all threads after duration + go func() { + time.Sleep(cctx.Duration("duration")) + for _, cmd := range cmds { + cmd.Stop() + } + }() + + // start all threads + var wg sync.WaitGroup + wg.Add(len(cmds)) + + for _, cmd := range cmds { + go func(cmd *CMD) { + defer wg.Done() + err := cmd.Run() + if err != nil { + fmt.Printf("error running cmd: %v\n", err) + } + }(cmd) + } + + // if watch is set then print a report every N seconds + var progressCh chan struct{} + if cctx.Duration("watch") > 0 { + progressCh = make(chan struct{}, 1) + go func(progressCh chan struct{}) { + ticker := time.NewTicker(cctx.Duration("watch")) + for { + clearAndPrintReport := func() { + // clear the screen move the cursor to the top left + fmt.Print("\033[2J") + fmt.Printf("\033[%d;%dH", 1, 1) + for i, cmd := range cmds { + cmd.Report() + if i < len(cmds)-1 { + fmt.Println() + } + } + } + select { + case <-ticker.C: + clearAndPrintReport() + case <-progressCh: + clearAndPrintReport() + return + } + } + }(progressCh) + } + + wg.Wait() + + if progressCh != nil { + // wait for the watch go routine to return + progressCh <- struct{}{} + + // no need to print the report again + return nil + } + + // print the report for each command + for i, cmd := range cmds { + cmd.Report() + if i < len(cmds)-1 { + fmt.Println() + } + } + + return nil + }, +} + +// CMD handles the benchmarking of a single command. +type CMD struct { + w io.Writer + // the cmd we want to benchmark + cmd string + // the number of concurrent requests to make to this command + concurrency int + // if >0 then limit to qps is the max number of requests per second to make to this command (0 = no limit) + qps int + // whether or not to print the response of each request (useful for debugging) + printResp bool + // instruct the worker go routines to stop + stopCh chan struct{} + // when the command bencharking started + start time.Time + // results channel is used by the workers to send results to the reporter + results chan *result + // reporter handles reading the results from workers and printing the report statistics + reporter *Reporter +} + +func (c *CMD) Run() error { + var wg sync.WaitGroup + wg.Add(c.concurrency) + + c.results = make(chan *result, c.concurrency*1_000) + c.stopCh = make(chan struct{}, c.concurrency) + + go func() { + c.reporter = NewReporter(c.results, c.w) + c.reporter.Run() + }() + + c.start = time.Now() + + // throttle the number of requests per second + var qpsTicker *time.Ticker + if c.qps > 0 { + qpsTicker = time.NewTicker(time.Second / time.Duration(c.qps)) + } + + for i := 0; i < c.concurrency; i++ { + go func() { + c.startWorker(qpsTicker) + wg.Done() + }() + } + wg.Wait() + + // close the results channel so reporter will stop + close(c.results) + + // wait until the reporter is done + <-c.reporter.doneCh + + return nil +} + +func (c *CMD) startWorker(qpsTicker *time.Ticker) { + for { + // check if we should stop + select { + case <-c.stopCh: + return + default: + } + + // wait for the next tick if we are rate limiting this command + if qpsTicker != nil { + <-qpsTicker.C + } + + start := time.Now() + + var statusCode int = 0 + + arr := strings.Fields(c.cmd) + + data, err := exec.Command(arr[0], arr[1:]...).Output() + if err != nil { + fmt.Println("1") + if exitError, ok := err.(*exec.ExitError); ok { + statusCode = exitError.ExitCode() + } else { + statusCode = 1 + } + } else { + if c.printResp { + fmt.Printf("[%s] %s", c.cmd, string(data)) + } + } + + c.results <- &result{ + statusCode: &statusCode, + err: err, + duration: time.Since(start), + } + } +} + +func (c *CMD) Stop() { + for i := 0; i < c.concurrency; i++ { + c.stopCh <- struct{}{} + } +} + +func (c *CMD) Report() { + total := time.Since(c.start) + fmt.Fprintf(c.w, "[%s]:\n", c.cmd) + fmt.Fprintf(c.w, "- Options:\n") + fmt.Fprintf(c.w, " - concurrency: %d\n", c.concurrency) + fmt.Fprintf(c.w, " - qps: %d\n", c.qps) + c.reporter.Print(total, c.w) +} diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index fc484c4e330..f9cbe0d8bc1 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -120,6 +120,7 @@ func main() { sealBenchCmd, simpleCmd, importBenchCmd, + cliCmd, rpcCmd, }, } diff --git a/cmd/lotus-bench/reporter.go b/cmd/lotus-bench/reporter.go new file mode 100644 index 00000000000..ad2ad6b9d86 --- /dev/null +++ b/cmd/lotus-bench/reporter.go @@ -0,0 +1,181 @@ +package main + +import ( + "fmt" + "io" + "sort" + "strings" + "sync" + "text/tabwriter" + "time" +) + +// result is the result of a single rpc method request. +type result struct { + err error + statusCode *int + duration time.Duration +} + +// Reporter reads the results from the workers through the results channel and aggregates the results. +type Reporter struct { + // write the report to this writer + w io.Writer + // the reporter read the results from this channel + results chan *result + // doneCh is used to signal that the reporter has finished reading the results (channel has closed) + doneCh chan bool + + // lock protect the following fields during critical sections (if --watch was specified) + lock sync.Mutex + // the latencies of all requests + latencies []int64 + // the number of requests that returned each status code + statusCodes map[int]int + // the number of errors that occurred + errors map[string]int +} + +func NewReporter(results chan *result, w io.Writer) *Reporter { + return &Reporter{ + w: w, + results: results, + doneCh: make(chan bool, 1), + statusCodes: make(map[int]int), + errors: make(map[string]int), + } +} + +func (r *Reporter) Run() { + for res := range r.results { + r.lock.Lock() + + r.latencies = append(r.latencies, res.duration.Milliseconds()) + + if res.statusCode != nil { + r.statusCodes[*res.statusCode]++ + } + + if res.err != nil { + if len(r.errors) < 1_000_000 { + r.errors[res.err.Error()]++ + } else { + // we don't want to store too many errors in memory + r.errors["hidden"]++ + } + } else { + r.errors["nil"]++ + } + + r.lock.Unlock() + } + + r.doneCh <- true +} + +func (r *Reporter) Print(elapsed time.Duration, w io.Writer) { + r.lock.Lock() + defer r.lock.Unlock() + + nrReq := int64(len(r.latencies)) + if nrReq == 0 { + fmt.Println("No requests were made") + return + } + + // we need to sort the latencies slice to calculate the percentiles + sort.Slice(r.latencies, func(i, j int) bool { + return r.latencies[i] < r.latencies[j] + }) + + var totalLatency int64 = 0 + for _, latency := range r.latencies { + totalLatency += latency + } + + fmt.Fprintf(w, "- Total Requests: %d\n", nrReq) + fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds()) + fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds()) + fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq) + fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2]) + fmt.Fprintf(w, "- Latency distribution:\n") + percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999} + for _, p := range percentiles { + idx := int64(p * float64(nrReq)) + fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx]) + } + + // create a simple histogram with 10 buckets spanning the range of latency + // into equal ranges + // + nrBucket := 10 + buckets := make([]Bucket, nrBucket) + latencyRange := r.latencies[len(r.latencies)-1] + bucketRange := latencyRange / int64(nrBucket) + + // mark the end of each bucket + for i := 0; i < nrBucket; i++ { + buckets[i].start = int64(i) * bucketRange + buckets[i].end = buckets[i].start + bucketRange + // extend the last bucked by any remaning range caused by the integer division + if i == nrBucket-1 { + buckets[i].end = latencyRange + } + } + + // count the number of requests in each bucket + currBucket := 0 + for i := 0; i < len(r.latencies); { + if r.latencies[i] <= buckets[currBucket].end { + buckets[currBucket].cnt++ + i++ + } else { + currBucket++ + } + } + + // print the histogram using a tabwriter which will align the columns nicely + fmt.Fprintf(w, "- Histogram:\n") + const padding = 2 + tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) + for i := 0; i < nrBucket; i++ { + ratio := float64(buckets[i].cnt) / float64(nrReq) + bars := strings.Repeat("#", int(ratio*100)) + fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100)) + } + tabWriter.Flush() //nolint:errcheck + + fmt.Fprintf(w, "- Status codes:\n") + for code, cnt := range r.statusCodes { + fmt.Fprintf(w, " [%d]: %d\n", code, cnt) + } + + // print the 10 most occurring errors (in case error values are not unique) + // + type kv struct { + err string + cnt int + } + var sortedErrors []kv + for err, cnt := range r.errors { + sortedErrors = append(sortedErrors, kv{err, cnt}) + } + sort.Slice(sortedErrors, func(i, j int) bool { + return sortedErrors[i].cnt > sortedErrors[j].cnt + }) + fmt.Fprintf(w, "- Errors (top 10):\n") + for i, se := range sortedErrors { + if i > 10 { + break + } + fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt) + } +} + +type Bucket struct { + start int64 + // the end value of the bucket + end int64 + // how many entries are in the bucket + cnt int +} diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go index 5da784c6ef3..4af4bdb27ee 100644 --- a/cmd/lotus-bench/rpc.go +++ b/cmd/lotus-bench/rpc.go @@ -9,11 +9,9 @@ import ( "net/http" "os" "os/signal" - "sort" "strconv" "strings" "sync" - "text/tabwriter" "time" "github.com/urfave/cli/v2" @@ -243,13 +241,6 @@ type RPCMethod struct { reporter *Reporter } -// result is the result of a single rpc method request. -type result struct { - err error - statusCode *int - duration time.Duration -} - func (rpc *RPCMethod) Run() error { client := &http.Client{ Timeout: 0, @@ -411,166 +402,3 @@ func (rpc *RPCMethod) Report() { fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps) rpc.reporter.Print(total, rpc.w) } - -// Reporter reads the results from the workers through the results channel and aggregates the results. -type Reporter struct { - // write the report to this writer - w io.Writer - // the reporter read the results from this channel - results chan *result - // doneCh is used to signal that the reporter has finished reading the results (channel has closed) - doneCh chan bool - - // lock protect the following fields during critical sections (if --watch was specified) - lock sync.Mutex - // the latencies of all requests - latencies []int64 - // the number of requests that returned each status code - statusCodes map[int]int - // the number of errors that occurred - errors map[string]int -} - -func NewReporter(results chan *result, w io.Writer) *Reporter { - return &Reporter{ - w: w, - results: results, - doneCh: make(chan bool, 1), - statusCodes: make(map[int]int), - errors: make(map[string]int), - } -} - -func (r *Reporter) Run() { - for res := range r.results { - r.lock.Lock() - - r.latencies = append(r.latencies, res.duration.Milliseconds()) - - if res.statusCode != nil { - r.statusCodes[*res.statusCode]++ - } - - if res.err != nil { - if len(r.errors) < 1_000_000 { - r.errors[res.err.Error()]++ - } else { - // we don't want to store too many errors in memory - r.errors["hidden"]++ - } - } else { - r.errors["nil"]++ - } - - r.lock.Unlock() - } - - r.doneCh <- true -} - -func (r *Reporter) Print(elapsed time.Duration, w io.Writer) { - r.lock.Lock() - defer r.lock.Unlock() - - nrReq := int64(len(r.latencies)) - if nrReq == 0 { - fmt.Println("No requests were made") - return - } - - // we need to sort the latencies slice to calculate the percentiles - sort.Slice(r.latencies, func(i, j int) bool { - return r.latencies[i] < r.latencies[j] - }) - - var totalLatency int64 = 0 - for _, latency := range r.latencies { - totalLatency += latency - } - - fmt.Fprintf(w, "- Total Requests: %d\n", nrReq) - fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds()) - fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds()) - fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq) - fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2]) - fmt.Fprintf(w, "- Latency distribution:\n") - percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999} - for _, p := range percentiles { - idx := int64(p * float64(nrReq)) - fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx]) - } - - // create a simple histogram with 10 buckets spanning the range of latency - // into equal ranges - // - nrBucket := 10 - buckets := make([]Bucket, nrBucket) - latencyRange := r.latencies[len(r.latencies)-1] - bucketRange := latencyRange / int64(nrBucket) - - // mark the end of each bucket - for i := 0; i < nrBucket; i++ { - buckets[i].start = int64(i) * bucketRange - buckets[i].end = buckets[i].start + bucketRange - // extend the last bucked by any remaning range caused by the integer division - if i == nrBucket-1 { - buckets[i].end = latencyRange - } - } - - // count the number of requests in each bucket - currBucket := 0 - for i := 0; i < len(r.latencies); { - if r.latencies[i] <= buckets[currBucket].end { - buckets[currBucket].cnt++ - i++ - } else { - currBucket++ - } - } - - // print the histogram using a tabwriter which will align the columns nicely - fmt.Fprintf(w, "- Histogram:\n") - const padding = 2 - tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) - for i := 0; i < nrBucket; i++ { - ratio := float64(buckets[i].cnt) / float64(nrReq) - bars := strings.Repeat("#", int(ratio*100)) - fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100)) - } - tabWriter.Flush() //nolint:errcheck - - fmt.Fprintf(w, "- Status codes:\n") - for code, cnt := range r.statusCodes { - fmt.Fprintf(w, " [%d]: %d\n", code, cnt) - } - - // print the 10 most occurring errors (in case error values are not unique) - // - type kv struct { - err string - cnt int - } - var sortedErrors []kv - for err, cnt := range r.errors { - sortedErrors = append(sortedErrors, kv{err, cnt}) - } - sort.Slice(sortedErrors, func(i, j int) bool { - return sortedErrors[i].cnt > sortedErrors[j].cnt - }) - fmt.Fprintf(w, "- Errors (top 10):\n") - for i, se := range sortedErrors { - if i > 10 { - break - } - fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt) - } -} - -type Bucket struct { - start int64 - // the end value of the bucket - end int64 - // how many entries are in the bucket - cnt int -}