Skip to content

Commit

Permalink
Merge pull request #153 from raintank/concurrentGrafanaNet
Browse files Browse the repository at this point in the history
Concurrent grafana net
  • Loading branch information
Dieterbe authored Feb 16, 2017
2 parents dd19f4e + 785e1dd commit 895d0d1
Show file tree
Hide file tree
Showing 21 changed files with 2,874 additions and 306 deletions.
31 changes: 28 additions & 3 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
optTimeout
optSSLVerify
word
optConcurrency
optOrgId
)

// we should make sure we apply changes atomatically. e.g. when changing dest between address A and pickle=false and B with pickle=true,
Expand Down Expand Up @@ -81,6 +83,8 @@ var tokens = []toki.Def{
{Token: optFlushMaxWait, Pattern: "flushMaxWait="},
{Token: optTimeout, Pattern: "timeout="},
{Token: optSSLVerify, Pattern: "sslverify="},
{Token: optConcurrency, Pattern: "concurrency="},
{Token: optOrgId, Pattern: "orgId="},
{Token: str, Pattern: "\".*\""},
{Token: sep, Pattern: "##"},
{Token: sumFn, Pattern: "sum"},
Expand All @@ -94,7 +98,7 @@ var tokens = []toki.Def{
var errFmtAddBlack = errors.New("addBlack <prefix|sub|regex> <pattern>")
var errFmtAddAgg = errors.New("addAgg <sum|avg> <regex> <fmt> <interval> <wait>")
var errFmtAddRoute = errors.New("addRoute <type> <key> [prefix/sub/regex=,..] <dest> [<dest>[...]] where <dest> is <addr> [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings
var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]")
var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]")
var errFmtAddDest = errors.New("addDest <routeKey> <dest>") // not implemented yet
var errFmtAddRewriter = errors.New("addRewriter <old> <new> <max>")
var errFmtModDest = errors.New("modDest <routeKey> <dest> <addr/prefix/sub/regex=>") // one or more can be specified at once
Expand Down Expand Up @@ -302,6 +306,8 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
var flushMaxNum = 10000 // number of metrics
var flushMaxWait = 500 // in ms
var timeout = 2000 // in ms
var concurrency = 10 // number of concurrent connections to tsdb-gw
var orgId = 1

for ; t.Token != toki.EOF; t = s.Next() {
switch t.Token {
Expand Down Expand Up @@ -345,6 +351,16 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
} else {
return errFmtAddRouteGrafanaNet
}
case optConcurrency:
t = s.Next()
if t.Token == num {
concurrency, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteGrafanaNet
}
case optTimeout:
t = s.Next()
if t.Token == num {
Expand All @@ -365,13 +381,22 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
} else {
return errFmtAddRouteGrafanaNet
}

case optOrgId:
t = s.Next()
if t.Token == num {
orgId, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteGrafanaNet
}
default:
return fmt.Errorf("unexpected token %d %q", t.Token, t.Value)
}
}

route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout)
route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId)
if err != nil {
return err
}
Expand Down
151 changes: 105 additions & 46 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"hash/fnv"
"net"
"net/http"
"strconv"
Expand All @@ -13,6 +14,7 @@ import (
"time"

"github.com/Dieterbe/go-metrics"
"github.com/golang/snappy"
dest "github.com/graphite-ng/carbon-relay-ng/destination"
"github.com/graphite-ng/carbon-relay-ng/matcher"
"github.com/graphite-ng/carbon-relay-ng/stats"
Expand All @@ -36,6 +38,12 @@ type GrafanaNet struct {
flushMaxWait time.Duration
timeout time.Duration
sslVerify bool
concurrency int
orgId int
writeQueues []chan []byte
shutdown chan struct{}
wg *sync.WaitGroup
client *http.Client

numErrFlush metrics.Counter
numOut metrics.Counter // metrics successfully written to our buffered conn (no flushing yet)
Expand All @@ -49,7 +57,7 @@ type GrafanaNet struct {
// NewGrafanaNet creates a special route that writes to a grafana.net datastore
// We will automatically run the route and the destination
// ignores spool for now
func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout int) (Route, error) {
func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId int) (Route, error) {
m, err := matcher.New(prefix, sub, regex)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,6 +95,11 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp
flushMaxWait: time.Duration(flushMaxWait) * time.Millisecond,
timeout: time.Duration(timeout) * time.Millisecond,
sslVerify: sslVerify,
concurrency: concurrency,
orgId: orgId,
writeQueues: make([]chan []byte, concurrency),
shutdown: make(chan struct{}),
wg: new(sync.WaitGroup),

numErrFlush: stats.Counter("dest=" + cleanAddr + ".unit=Err.type=flush"),
numOut: stats.Counter("dest=" + cleanAddr + ".unit=Metric.direction=out"),
Expand All @@ -96,21 +109,17 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp
manuFlushSize: stats.Histogram("dest=" + cleanAddr + ".unit=B.what=FlushSize.type=manual"),
numBuffered: stats.Gauge("dest=" + cleanAddr + ".unit=Metric.what=numBuffered"),
}

for i := 0; i < r.concurrency; i++ {
r.writeQueues[i] = make(chan []byte)
}
r.config.Store(baseConfig{*m, make([]*dest.Destination, 0)})
go r.run()
return r, nil
}

func (route *GrafanaNet) run() {
metrics := make([]*schema.MetricData, 0, route.flushMaxNum)
ticker := time.NewTicker(route.flushMaxWait)
client := &http.Client{
Timeout: route.timeout,
r.client = &http.Client{
Timeout: r.timeout,
}
if !route.sslVerify {
if !r.sslVerify {
// this transport should be the equivalent of Go's DefaultTransport
client.Transport = &http.Transport{
r.client.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
Expand All @@ -125,41 +134,101 @@ func (route *GrafanaNet) run() {
}
}

b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: time.Minute,
Factor: 1.5,
Jitter: true,
go r.run()
return r, nil
}

func (route *GrafanaNet) run() {
metrics := make([][]*schema.MetricData, route.concurrency)
route.wg.Add(route.concurrency)
for i := 0; i < route.concurrency; i++ {
metrics[i] = make([]*schema.MetricData, 0, route.flushMaxNum)

// start up our goroutines for writing data to tsdb-gw
go route.flush(i)
}

flush := func() {
if len(metrics) == 0 {
flush := func(shard int) {
if len(metrics[shard]) == 0 {
return
}

mda := schema.MetricDataArray(metrics)
mda := schema.MetricDataArray(metrics[shard])
data, err := msg.CreateMsg(mda, 0, msg.FormatMetricDataArrayMsgp)
if err != nil {
panic(err)
}
route.writeQueues[shard] <- data
route.numOut.Inc(int64(len(metrics[shard])))
metrics[shard] = metrics[shard][:0]

}

hasher := fnv.New32a()

ticker := time.NewTicker(route.flushMaxWait)
for {
select {
case buf := <-route.buf:
route.numBuffered.Dec(1)
md := parseMetric(buf, route.schemas, route.orgId)
if md == nil {
continue
}
md.SetId()

//re-use our []byte slice to save an allocation.
buf = md.KeyBySeries(buf[:0])
hasher.Reset()
hasher.Write(buf)
shard := int(hasher.Sum32() % uint32(route.concurrency))
metrics[shard] = append(metrics[shard], md)
if len(metrics[shard]) == route.flushMaxNum {
flush(shard)
}
case <-ticker.C:
for shard := 0; shard < route.concurrency; shard++ {
flush(shard)
}
case <-route.shutdown:
for shard := 0; shard < route.concurrency; shard++ {
flush(shard)
close(route.writeQueues[shard])
}
return
}
}
}

func (route *GrafanaNet) flush(shard int) {
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: time.Minute,
Factor: 1.5,
Jitter: true,
}
body := new(bytes.Buffer)
for data := range route.writeQueues[shard] {
for {
pre := time.Now()
req, err := http.NewRequest("POST", route.addr, bytes.NewBuffer(data))
body.Reset()
snappyBody := snappy.NewWriter(body)
snappyBody.Write(data)
snappyBody.Close()
req, err := http.NewRequest("POST", route.addr, body)
if err != nil {
panic(err)
}
req.Header.Add("Authorization", "Bearer "+route.apiKey)
req.Header.Add("Content-Type", "rt-metric-binary")
resp, err := client.Do(req)
req.Header.Add("Content-Type", "rt-metric-binary-snappy")
resp, err := route.client.Do(req)
diff := time.Since(pre)
if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
b.Reset()
log.Info("GrafanaNet sent %d metrics in %s -msg size %d", len(metrics), diff, len(data))
route.numOut.Inc(int64(len(metrics)))
log.Info("GrafanaNet sent metrics in %s -msg size %d", diff, len(data))

route.tickFlushSize.Update(int64(len(data)))
route.durationTickFlush.Update(diff)
metrics = metrics[:0]

resp.Body.Close()
break
}
Expand All @@ -177,26 +246,10 @@ func (route *GrafanaNet) run() {
time.Sleep(dur)
}
}
for {
select {
case buf := <-route.buf:
route.numBuffered.Dec(1)
md := parseMetric(buf, route.schemas)
if md == nil {
continue
}
md.SetId()
metrics = append(metrics, md)
if len(metrics) == route.flushMaxNum {
flush()
}
case <-ticker.C:
flush()
}
}
route.wg.Done()
}

func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricData {
func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) *schema.MetricData {
msg := strings.TrimSpace(string(buf))

elements := strings.Fields(msg)
Expand Down Expand Up @@ -230,7 +283,7 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricDat
Time: int64(timestamp),
Mtype: "gauge",
Tags: []string{},
OrgId: 1, // the hosted tsdb service will adjust to the correct OrgId if using a regular key. orgid 1 is only retained with admin key.
OrgId: orgId, // This may be overwritten by the TSDB-GW if it does not match the orgId of the apiKey used
}
return &md
}
Expand All @@ -251,6 +304,12 @@ func (route *GrafanaNet) Flush() error {

func (route *GrafanaNet) Shutdown() error {
//conf := route.config.Load().(Config)

// trigger all of our queues to be flushed to the tsdb-gw
route.shutdown <- struct{}{}

// wait for all tsdb-gw writes to complete.
route.wg.Wait()
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions vendor/github.com/golang/snappy/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions vendor/github.com/golang/snappy/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 895d0d1

Please sign in to comment.