Skip to content

Commit

Permalink
add orgId option to grafanaNet route handler
Browse files Browse the repository at this point in the history
- this allows using the tsdb-gw "adminkey" to send data as any
org.
  • Loading branch information
woodsaj committed Feb 16, 2017
1 parent 6c0a47f commit 785e1dd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
18 changes: 15 additions & 3 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
optSSLVerify
word
optConcurrency
optOrgId
)

// we should make sure we apply changes atomatically. e.g. when changing dest between address A and pickle=false and B with pickle=true,
Expand Down Expand Up @@ -83,6 +84,7 @@ var tokens = []toki.Def{
{Token: optTimeout, Pattern: "timeout="},
{Token: optSSLVerify, Pattern: "sslverify="},
{Token: optConcurrency, Pattern: "concurrency="},
{Token: optOrgId, Pattern: "orgId="},
{Token: str, Pattern: "\".*\""},
{Token: sep, Pattern: "##"},
{Token: sumFn, Pattern: "sum"},
Expand All @@ -96,7 +98,7 @@ var tokens = []toki.Def{
var errFmtAddBlack = errors.New("addBlack <prefix|sub|regex> <pattern>")
var errFmtAddAgg = errors.New("addAgg <sum|avg> <regex> <fmt> <interval> <wait>")
var errFmtAddRoute = errors.New("addRoute <type> <key> [prefix/sub/regex=,..] <dest> [<dest>[...]] where <dest> is <addr> [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings
var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int]")
var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]")
var errFmtAddDest = errors.New("addDest <routeKey> <dest>") // not implemented yet
var errFmtAddRewriter = errors.New("addRewriter <old> <new> <max>")
var errFmtModDest = errors.New("modDest <routeKey> <dest> <addr/prefix/sub/regex=>") // one or more can be specified at once
Expand Down Expand Up @@ -305,6 +307,7 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
var flushMaxWait = 500 // in ms
var timeout = 2000 // in ms
var concurrency = 10 // number of concurrent connections to tsdb-gw
var orgId = 1

for ; t.Token != toki.EOF; t = s.Next() {
switch t.Token {
Expand Down Expand Up @@ -378,13 +381,22 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
} else {
return errFmtAddRouteGrafanaNet
}

case optOrgId:
t = s.Next()
if t.Token == num {
orgId, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteGrafanaNet
}
default:
return fmt.Errorf("unexpected token %d %q", t.Token, t.Value)
}
}

route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency)
route, err := route.NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile, spool, sslVerify, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId)
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type GrafanaNet struct {
timeout time.Duration
sslVerify bool
concurrency int
orgId int
writeQueues []chan []byte
shutdown chan struct{}
wg *sync.WaitGroup
Expand All @@ -56,7 +57,7 @@ type GrafanaNet struct {
// NewGrafanaNet creates a special route that writes to a grafana.net datastore
// We will automatically run the route and the destination
// ignores spool for now
func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency int) (Route, error) {
func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, spool, sslVerify bool, bufSize, flushMaxNum, flushMaxWait, timeout, concurrency, orgId int) (Route, error) {
m, err := matcher.New(prefix, sub, regex)
if err != nil {
return nil, err
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp
timeout: time.Duration(timeout) * time.Millisecond,
sslVerify: sslVerify,
concurrency: concurrency,
orgId: orgId,
writeQueues: make([]chan []byte, concurrency),
shutdown: make(chan struct{}),
wg: new(sync.WaitGroup),
Expand Down Expand Up @@ -168,11 +170,13 @@ func (route *GrafanaNet) run() {
select {
case buf := <-route.buf:
route.numBuffered.Dec(1)
md := parseMetric(buf, route.schemas)
md := parseMetric(buf, route.schemas, route.orgId)
if md == nil {
continue
}
md.SetId()

//re-use our []byte slice to save an allocation.
buf = md.KeyBySeries(buf[:0])
hasher.Reset()
hasher.Write(buf)
Expand Down Expand Up @@ -245,7 +249,7 @@ func (route *GrafanaNet) flush(shard int) {
route.wg.Done()
}

func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricData {
func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) *schema.MetricData {
msg := strings.TrimSpace(string(buf))

elements := strings.Fields(msg)
Expand Down Expand Up @@ -279,7 +283,7 @@ func parseMetric(buf []byte, schemas persister.WhisperSchemas) *schema.MetricDat
Time: int64(timestamp),
Mtype: "gauge",
Tags: []string{},
OrgId: 1, // the hosted tsdb service will adjust to the correct OrgId if using a regular key. orgid 1 is only retained with admin key.
OrgId: orgId, // This may be overwritten by the TSDB-GW if it does not match the orgId of the apiKey used
}
return &md
}
Expand Down

0 comments on commit 785e1dd

Please sign in to comment.