diff --git a/README.md b/README.md new file mode 100644 index 0000000..d3c2a80 --- /dev/null +++ b/README.md @@ -0,0 +1,47 @@ +MQTT benchmarking tool +========= +A simple MQTT (broker) benchmarking tool. + +Supports multiple concurrent clients, configurable message size, etc: +``` +> mqtt-benchmark --help +Usage of mqtt-benchmark: + -broker="tcp://localhost:1883": MQTT broker endpoint as scheme://host:port + -clients=10: Number of clients to start + -count=100: Number of messages to send per client + -format="text": Output format: text|json + -password="": MQTT password (empty if auth disabled) + -qos=1: QoS for published messages + -size=100: Size of the messages payload (bytes) + -topic="/test": MQTT topic for incoming message + -username="": MQTT username (empty if auth disabled) +``` + +Two output formats supported: human-readable plain text and JSON. + +Example use and output: + +``` +> mqtt-benchmark --broker tcp://broker.local:1883 --count 100 --size 100 --clients 100 --qos 2 --format text +.... + +======= CLIENT 27 ======= +Ratio: 1 (100/100) +Runtime (s): 16.396 +Msg time min (ms): 9.466 +Msg time max (ms): 1880.769 +Msg time mean (ms): 150.193 +Msg time std (ms): 201.884 +Bandwidth (msg/sec): 6.099 + +========= TOTAL (100) ========= +Total Ratio: 1 (10000/10000) +Total Runime (sec): 16.398 +Average Runtime (sec): 15.514 +Msg time min (ms): 7.766 +Msg time max (ms): 2034.076 +Msg time mean mean (ms): 140.751 +Msg time mean std (ms): 13.695 +Average Bandwidth (msg/sec): 6.761 +Total Bandwidth (msg/sec): 676.112 +``` \ No newline at end of file diff --git a/client.go b/client.go new file mode 100644 index 0000000..2e1fd68 --- /dev/null +++ b/client.go @@ -0,0 +1,132 @@ +package main + +import ( + "fmt" + "log" + "time" +) + +import ( + mqtt "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" + "github.com/GaryBoone/GoStats/stats" +) + +type Client struct { + ID int + BrokerURL string + BrokerUser string + BrokerPass string + MsgTopic string + MsgSize int + MsgCount int + MsgQoS byte +} + +func (c *Client) Run(res chan *RunResults) { + newMsgs := make(chan *Message) + pubMsgs := make(chan *Message) + doneGen := make(chan bool) + donePub := make(chan bool) + runResults := new(RunResults) + + started := time.Now() + // start generator + go c.genMessages(newMsgs, doneGen) + // start publisher + go c.pubMessages(newMsgs, pubMsgs, doneGen, donePub) + + runResults.ID = c.ID + times := []float64{} + for { + select { + case m := <-pubMsgs: + if m.Error { + log.Printf("CLIENT %v ERROR publishing message: %v: at %v\n", c.ID, m.Topic, m.Sent.Unix()) + runResults.Failures++ + } else { + // log.Printf("Message published: %v: sent: %v delivered: %v flight time: %v\n", m.Topic, m.Sent, m.Delivered, m.Delivered.Sub(m.Sent)) + runResults.Successes++ + times = append(times, m.Delivered.Sub(m.Sent).Seconds()*1000) // in milliseconds + } + case <-donePub: + // calculate results + duration := time.Now().Sub(started) + runResults.MsgTimeMin = stats.StatsMin(times) + runResults.MsgTimeMax = stats.StatsMax(times) + runResults.MsgTimeMean = stats.StatsMean(times) + runResults.MsgTimeStd = stats.StatsSampleStandardDeviation(times) + runResults.RunTime = duration.Seconds() + runResults.MsgsPerSec = float64(runResults.Successes) / duration.Seconds() + + // report results and exit + res <- runResults + return + } + } +} + +func (c *Client) genMessages(ch chan *Message, done chan bool) { + for i := 0; i < c.MsgCount; i++ { + ch <- &Message{ + Topic: c.MsgTopic, + QoS: c.MsgQoS, + Payload: make([]byte, c.MsgSize), + } + } + done <- true + // log.Printf("CLIENT %v is done generating messages\n", c.ID) + return +} + +func (c *Client) pubMessages(in, out chan *Message, doneGen, donePub chan bool) { + onConnected := func(client *mqtt.Client) { + log.Printf("CLIENT %v is connected to the broker %v\n", c.ID, c.BrokerURL) + ctr := 0 + for { + select { + case m := <-in: + m.Sent = time.Now() + token := client.Publish(m.Topic, m.QoS, false, m.Payload) + token.Wait() + if token.Error() != nil { + log.Printf("CLIENT %v Error sending message: %v\n", c.ID, token.Error()) + m.Error = true + } else { + m.Delivered = time.Now() + m.Error = false + } + out <- m + + if ctr > 0 && ctr%100 == 0 { + log.Printf("CLIENT %v published %v messages and keeps publishing...\n", c.ID, ctr) + } + ctr++ + case <-doneGen: + donePub <- true + log.Printf("CLIENT %v is done publishing\n", c.ID) + return + } + } + } + + opts := mqtt.NewClientOptions(). + AddBroker(c.BrokerURL). + SetClientID(fmt.Sprintf("mqtt-benchmark-%v-%v", time.Now(), c.ID)). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(onConnected). + SetConnectionLostHandler(func(client *mqtt.Client, reason error) { + log.Printf("CLIENT %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error()) + }) + if c.BrokerUser != "" && c.BrokerPass != "" { + opts.SetUsername(c.BrokerUser) + opts.SetPassword(c.BrokerPass) + } + client := mqtt.NewClient(opts) + token := client.Connect() + token.Wait() + + if token.Error() != nil { + log.Printf("CLIENT %v had error connecting to the broker: %v\n", c.ID, token.Error()) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..07c4cb3 --- /dev/null +++ b/main.go @@ -0,0 +1,171 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "log" + "time" + + "github.com/GaryBoone/GoStats/stats" +) + +type Message struct { + Topic string + QoS byte + Payload interface{} + Sent time.Time + Delivered time.Time + Error bool +} + +type RunResults struct { + ID int `json:"id"` + Successes int64 `json:"successes"` + Failures int64 `json:"failures"` + RunTime float64 `json:"run_time"` + MsgTimeMin float64 `json:"msg_tim_min"` + MsgTimeMax float64 `json:"msg_time_max"` + MsgTimeMean float64 `json:"msg_time_mean"` + MsgTimeStd float64 `json:"msg_time_std"` + MsgsPerSec float64 `json:"msgs_per_sec"` +} + +type TotalResults struct { + Successes int64 `json:"successes"` + Failures int64 `json:"failures"` + TotalRunTime float64 `json:"total_run_time"` + AvgRunTime float64 `json:"avg_run_time"` + MsgTimeMin float64 `json:"msg_time_min"` + MsgTimeMax float64 `json:"msg_time_max"` + MsgTimeMeanAvg float64 `json:"msg_time_mean_avg"` + MsgTimeMeanStd float64 `json:"msg_time_mean_std"` + TotalMsgsPerSec float64 `json:"total_msgs_per_sec"` + AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"` +} + +type JSONResults struct { + Runs []*RunResults `json:"runs"` + Totals *TotalResults `json:"totals"` +} + +func main() { + + var ( + broker = flag.String("broker", "tcp://localhost:1883", "MQTT broker endpoint as scheme://host:port") + topic = flag.String("topic", "/test", "MQTT topic for incoming message") + username = flag.String("username", "", "MQTT username (empty if auth disabled)") + password = flag.String("password", "", "MQTT password (empty if auth disabled)") + qos = flag.Int("qos", 1, "QoS for published messages") + size = flag.Int("size", 100, "Size of the messages payload (bytes)") + count = flag.Int("count", 100, "Number of messages to send per client") + clients = flag.Int("clients", 10, "Number of clients to start") + format = flag.String("format", "text", "Output format: text|json") + ) + + flag.Parse() + if *clients < 0 { + log.Fatal("Invlalid arguments") + } + + resCh := make(chan *RunResults) + start := time.Now() + for i := 0; i < *clients; i++ { + log.Println("Starting client ", i) + c := &Client{ + ID: i, + BrokerURL: *broker, + BrokerUser: *username, + BrokerPass: *password, + MsgTopic: *topic, + MsgSize: *size, + MsgCount: *count, + MsgQoS: byte(*qos), + } + go c.Run(resCh) + } + + // collect the results + results := make([]*RunResults, *clients) + for i := 0; i < *clients; i++ { + results[i] = <-resCh + } + totalTime := time.Now().Sub(start) + totals := calculateTotalResults(results, totalTime) + + // print stats + printResults(results, totals, *format) +} +func calculateTotalResults(results []*RunResults, totalTime time.Duration) *TotalResults { + totals := new(TotalResults) + totals.TotalRunTime = totalTime.Seconds() + + msgTimeMeans := make([]float64, len(results)) + msgsPerSecs := make([]float64, len(results)) + runTimes := make([]float64, len(results)) + bws := make([]float64, len(results)) + + totals.MsgTimeMin = results[0].MsgTimeMin + for i, res := range results { + totals.Successes += res.Successes + totals.Failures += res.Failures + totals.TotalMsgsPerSec += res.MsgsPerSec + + if res.MsgTimeMin < totals.MsgTimeMin { + totals.MsgTimeMin = res.MsgTimeMin + } + + if res.MsgTimeMax > totals.MsgTimeMax { + totals.MsgTimeMax = res.MsgTimeMax + } + + msgTimeMeans[i] = res.MsgTimeMean + msgsPerSecs[i] = res.MsgsPerSec + runTimes[i] = res.RunTime + bws[i] = res.MsgsPerSec + } + totals.AvgMsgsPerSec = stats.StatsMean(msgsPerSecs) + totals.AvgRunTime = stats.StatsMean(runTimes) + totals.MsgTimeMeanAvg = stats.StatsMean(msgTimeMeans) + totals.MsgTimeMeanStd = stats.StatsSampleStandardDeviation(msgTimeMeans) + + return totals +} + +func printResults(results []*RunResults, totals *TotalResults, format string) { + switch format { + case "json": + jr := JSONResults{ + Runs: results, + Totals: totals, + } + data, _ := json.Marshal(jr) + var out bytes.Buffer + json.Indent(&out, data, "", "\t") + + fmt.Println(string(out.Bytes())) + default: + for _, res := range results { + fmt.Printf("======= CLIENT %d =======\n", res.ID) + fmt.Printf("Ratio: %d (%d/%d)\n", res.Successes/(res.Successes+res.Failures), res.Successes, res.Successes+res.Failures) + fmt.Printf("Runtime (s): %.3f\n", res.RunTime) + fmt.Printf("Msg time min (ms): %.3f\n", res.MsgTimeMin) + fmt.Printf("Msg time max (ms): %.3f\n", res.MsgTimeMax) + fmt.Printf("Msg time mean (ms): %.3f\n", res.MsgTimeMean) + fmt.Printf("Msg time std (ms): %.3f\n", res.MsgTimeStd) + fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) + } + fmt.Printf("========= TOTAL (%d) =========\n", len(results)) + fmt.Printf("Total Ratio: %d (%d/%d)\n", totals.Successes/(totals.Successes+totals.Failures), totals.Successes, totals.Successes+totals.Failures) + fmt.Printf("Total Runime (sec): %.3f\n", totals.TotalRunTime) + fmt.Printf("Average Runtime (sec): %.3f\n", totals.AvgRunTime) + fmt.Printf("Msg time min (ms): %.3f\n", totals.MsgTimeMin) + fmt.Printf("Msg time max (ms): %.3f\n", totals.MsgTimeMax) + fmt.Printf("Msg time mean mean (ms): %.3f\n", totals.MsgTimeMeanAvg) + fmt.Printf("Msg time mean std (ms): %.3f\n", totals.MsgTimeMeanStd) + fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec) + fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec) + } + return +}