Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
krylovsk committed Jun 7, 2015
0 parents commit e38e5ec
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 0 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
MQTT benchmarking tool
=========
A simple MQTT (broker) benchmarking tool.

Supports multiple concurrent clients, configurable message size, etc:
```
> mqtt-benchmark --help
Usage of mqtt-benchmark:
-broker="tcp://localhost:1883": MQTT broker endpoint as scheme://host:port
-clients=10: Number of clients to start
-count=100: Number of messages to send per client
-format="text": Output format: text|json
-password="": MQTT password (empty if auth disabled)
-qos=1: QoS for published messages
-size=100: Size of the messages payload (bytes)
-topic="/test": MQTT topic for incoming message
-username="": MQTT username (empty if auth disabled)
```

Two output formats supported: human-readable plain text and JSON.

Example use and output:

```
> mqtt-benchmark --broker tcp://broker.local:1883 --count 100 --size 100 --clients 100 --qos 2 --format text
....
======= CLIENT 27 =======
Ratio: 1 (100/100)
Runtime (s): 16.396
Msg time min (ms): 9.466
Msg time max (ms): 1880.769
Msg time mean (ms): 150.193
Msg time std (ms): 201.884
Bandwidth (msg/sec): 6.099
========= TOTAL (100) =========
Total Ratio: 1 (10000/10000)
Total Runime (sec): 16.398
Average Runtime (sec): 15.514
Msg time min (ms): 7.766
Msg time max (ms): 2034.076
Msg time mean mean (ms): 140.751
Msg time mean std (ms): 13.695
Average Bandwidth (msg/sec): 6.761
Total Bandwidth (msg/sec): 676.112
```
132 changes: 132 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"fmt"
"log"
"time"
)

import (
mqtt "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"github.com/GaryBoone/GoStats/stats"
)

type Client struct {
ID int
BrokerURL string
BrokerUser string
BrokerPass string
MsgTopic string
MsgSize int
MsgCount int
MsgQoS byte
}

func (c *Client) Run(res chan *RunResults) {
newMsgs := make(chan *Message)
pubMsgs := make(chan *Message)
doneGen := make(chan bool)
donePub := make(chan bool)
runResults := new(RunResults)

started := time.Now()
// start generator
go c.genMessages(newMsgs, doneGen)
// start publisher
go c.pubMessages(newMsgs, pubMsgs, doneGen, donePub)

runResults.ID = c.ID
times := []float64{}
for {
select {
case m := <-pubMsgs:
if m.Error {
log.Printf("CLIENT %v ERROR publishing message: %v: at %v\n", c.ID, m.Topic, m.Sent.Unix())
runResults.Failures++
} else {
// log.Printf("Message published: %v: sent: %v delivered: %v flight time: %v\n", m.Topic, m.Sent, m.Delivered, m.Delivered.Sub(m.Sent))
runResults.Successes++
times = append(times, m.Delivered.Sub(m.Sent).Seconds()*1000) // in milliseconds
}
case <-donePub:
// calculate results
duration := time.Now().Sub(started)
runResults.MsgTimeMin = stats.StatsMin(times)
runResults.MsgTimeMax = stats.StatsMax(times)
runResults.MsgTimeMean = stats.StatsMean(times)
runResults.MsgTimeStd = stats.StatsSampleStandardDeviation(times)
runResults.RunTime = duration.Seconds()
runResults.MsgsPerSec = float64(runResults.Successes) / duration.Seconds()

// report results and exit
res <- runResults
return
}
}
}

func (c *Client) genMessages(ch chan *Message, done chan bool) {
for i := 0; i < c.MsgCount; i++ {
ch <- &Message{
Topic: c.MsgTopic,
QoS: c.MsgQoS,
Payload: make([]byte, c.MsgSize),
}
}
done <- true
// log.Printf("CLIENT %v is done generating messages\n", c.ID)
return
}

func (c *Client) pubMessages(in, out chan *Message, doneGen, donePub chan bool) {
onConnected := func(client *mqtt.Client) {
log.Printf("CLIENT %v is connected to the broker %v\n", c.ID, c.BrokerURL)
ctr := 0
for {
select {
case m := <-in:
m.Sent = time.Now()
token := client.Publish(m.Topic, m.QoS, false, m.Payload)
token.Wait()
if token.Error() != nil {
log.Printf("CLIENT %v Error sending message: %v\n", c.ID, token.Error())
m.Error = true
} else {
m.Delivered = time.Now()
m.Error = false
}
out <- m

if ctr > 0 && ctr%100 == 0 {
log.Printf("CLIENT %v published %v messages and keeps publishing...\n", c.ID, ctr)
}
ctr++
case <-doneGen:
donePub <- true
log.Printf("CLIENT %v is done publishing\n", c.ID)
return
}
}
}

opts := mqtt.NewClientOptions().
AddBroker(c.BrokerURL).
SetClientID(fmt.Sprintf("mqtt-benchmark-%v-%v", time.Now(), c.ID)).
SetCleanSession(true).
SetAutoReconnect(true).
SetOnConnectHandler(onConnected).
SetConnectionLostHandler(func(client *mqtt.Client, reason error) {
log.Printf("CLIENT %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
if c.BrokerUser != "" && c.BrokerPass != "" {
opts.SetUsername(c.BrokerUser)
opts.SetPassword(c.BrokerPass)
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()

if token.Error() != nil {
log.Printf("CLIENT %v had error connecting to the broker: %v\n", c.ID, token.Error())
}
}
171 changes: 171 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"log"
"time"

"github.com/GaryBoone/GoStats/stats"
)

type Message struct {
Topic string
QoS byte
Payload interface{}
Sent time.Time
Delivered time.Time
Error bool
}

type RunResults struct {
ID int `json:"id"`
Successes int64 `json:"successes"`
Failures int64 `json:"failures"`
RunTime float64 `json:"run_time"`
MsgTimeMin float64 `json:"msg_tim_min"`
MsgTimeMax float64 `json:"msg_time_max"`
MsgTimeMean float64 `json:"msg_time_mean"`
MsgTimeStd float64 `json:"msg_time_std"`
MsgsPerSec float64 `json:"msgs_per_sec"`
}

type TotalResults struct {
Successes int64 `json:"successes"`
Failures int64 `json:"failures"`
TotalRunTime float64 `json:"total_run_time"`
AvgRunTime float64 `json:"avg_run_time"`
MsgTimeMin float64 `json:"msg_time_min"`
MsgTimeMax float64 `json:"msg_time_max"`
MsgTimeMeanAvg float64 `json:"msg_time_mean_avg"`
MsgTimeMeanStd float64 `json:"msg_time_mean_std"`
TotalMsgsPerSec float64 `json:"total_msgs_per_sec"`
AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"`
}

type JSONResults struct {
Runs []*RunResults `json:"runs"`
Totals *TotalResults `json:"totals"`
}

func main() {

var (
broker = flag.String("broker", "tcp://localhost:1883", "MQTT broker endpoint as scheme://host:port")
topic = flag.String("topic", "/test", "MQTT topic for incoming message")
username = flag.String("username", "", "MQTT username (empty if auth disabled)")
password = flag.String("password", "", "MQTT password (empty if auth disabled)")
qos = flag.Int("qos", 1, "QoS for published messages")
size = flag.Int("size", 100, "Size of the messages payload (bytes)")
count = flag.Int("count", 100, "Number of messages to send per client")
clients = flag.Int("clients", 10, "Number of clients to start")
format = flag.String("format", "text", "Output format: text|json")
)

flag.Parse()
if *clients < 0 {
log.Fatal("Invlalid arguments")
}

resCh := make(chan *RunResults)
start := time.Now()
for i := 0; i < *clients; i++ {
log.Println("Starting client ", i)
c := &Client{
ID: i,
BrokerURL: *broker,
BrokerUser: *username,
BrokerPass: *password,
MsgTopic: *topic,
MsgSize: *size,
MsgCount: *count,
MsgQoS: byte(*qos),
}
go c.Run(resCh)
}

// collect the results
results := make([]*RunResults, *clients)
for i := 0; i < *clients; i++ {
results[i] = <-resCh
}
totalTime := time.Now().Sub(start)
totals := calculateTotalResults(results, totalTime)

// print stats
printResults(results, totals, *format)
}
func calculateTotalResults(results []*RunResults, totalTime time.Duration) *TotalResults {
totals := new(TotalResults)
totals.TotalRunTime = totalTime.Seconds()

msgTimeMeans := make([]float64, len(results))
msgsPerSecs := make([]float64, len(results))
runTimes := make([]float64, len(results))
bws := make([]float64, len(results))

totals.MsgTimeMin = results[0].MsgTimeMin
for i, res := range results {
totals.Successes += res.Successes
totals.Failures += res.Failures
totals.TotalMsgsPerSec += res.MsgsPerSec

if res.MsgTimeMin < totals.MsgTimeMin {
totals.MsgTimeMin = res.MsgTimeMin
}

if res.MsgTimeMax > totals.MsgTimeMax {
totals.MsgTimeMax = res.MsgTimeMax
}

msgTimeMeans[i] = res.MsgTimeMean
msgsPerSecs[i] = res.MsgsPerSec
runTimes[i] = res.RunTime
bws[i] = res.MsgsPerSec
}
totals.AvgMsgsPerSec = stats.StatsMean(msgsPerSecs)
totals.AvgRunTime = stats.StatsMean(runTimes)
totals.MsgTimeMeanAvg = stats.StatsMean(msgTimeMeans)
totals.MsgTimeMeanStd = stats.StatsSampleStandardDeviation(msgTimeMeans)

return totals
}

func printResults(results []*RunResults, totals *TotalResults, format string) {
switch format {
case "json":
jr := JSONResults{
Runs: results,
Totals: totals,
}
data, _ := json.Marshal(jr)
var out bytes.Buffer
json.Indent(&out, data, "", "\t")

fmt.Println(string(out.Bytes()))
default:
for _, res := range results {
fmt.Printf("======= CLIENT %d =======\n", res.ID)
fmt.Printf("Ratio: %d (%d/%d)\n", res.Successes/(res.Successes+res.Failures), res.Successes, res.Successes+res.Failures)
fmt.Printf("Runtime (s): %.3f\n", res.RunTime)
fmt.Printf("Msg time min (ms): %.3f\n", res.MsgTimeMin)
fmt.Printf("Msg time max (ms): %.3f\n", res.MsgTimeMax)
fmt.Printf("Msg time mean (ms): %.3f\n", res.MsgTimeMean)
fmt.Printf("Msg time std (ms): %.3f\n", res.MsgTimeStd)
fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec)
}
fmt.Printf("========= TOTAL (%d) =========\n", len(results))
fmt.Printf("Total Ratio: %d (%d/%d)\n", totals.Successes/(totals.Successes+totals.Failures), totals.Successes, totals.Successes+totals.Failures)
fmt.Printf("Total Runime (sec): %.3f\n", totals.TotalRunTime)
fmt.Printf("Average Runtime (sec): %.3f\n", totals.AvgRunTime)
fmt.Printf("Msg time min (ms): %.3f\n", totals.MsgTimeMin)
fmt.Printf("Msg time max (ms): %.3f\n", totals.MsgTimeMax)
fmt.Printf("Msg time mean mean (ms): %.3f\n", totals.MsgTimeMeanAvg)
fmt.Printf("Msg time mean std (ms): %.3f\n", totals.MsgTimeMeanStd)
fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec)
fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec)
}
return
}

0 comments on commit e38e5ec

Please sign in to comment.