Skip to content

Commit

Permalink
Merge pull request #160 from jnjackins/master
Browse files Browse the repository at this point in the history
Implement AMQP input
  • Loading branch information
Dieterbe authored Feb 13, 2017
2 parents 839cc19 + 3d62987 commit dd19f4e
Show file tree
Hide file tree
Showing 27 changed files with 8,363 additions and 0 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ if connection is down and spooling enabled. we try to spool but if it's slow we
if connection is down and spooling disabled -> drop the data


Input
-----

As with the Python implementation of carbon-relay, metrics can be pushed to carbon-relay-ng via TCP
(plain text or pickle) or by using an AMQP broker such as RabbitMQ. To send metrics via AMQP, create
a topic exchange (named "metrics" in the example carbon-relay-ng.ini) and publish messages to it in
the usual metric format: `<metric path> <metric value> <metric timestamp>`. An exclusive, ephemeral
queue will automatically be created and bound to the exchange, which carbon-relay-ng will consume from.


Validation
----------

Expand Down
11 changes: 11 additions & 0 deletions cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Config struct {
Admin_addr string
Http_addr string
Spool_dir string
Amqp Amqp
Max_procs int
First_only bool
Routes []*route.Route
Expand All @@ -25,6 +26,16 @@ type Config struct {
Validate_order bool
}

type Amqp struct {
Amqp_enabled bool
Amqp_host string
Amqp_port int
Amqp_vhost string
Amqp_user string
Amqp_password string
Amqp_exchange string
}

type instrumentation struct {
Graphite_addr string
Graphite_interval int
Expand Down
4 changes: 4 additions & 0 deletions cmd/carbon-relay-ng/carbon-relay-ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func main() {
}
}

if config.Amqp.Amqp_enabled == true {
go input.StartAMQP(config, table, badMetrics)
}

if config.Admin_addr != "" {
go func() {
err := telnet.Start(config.Admin_addr, table)
Expand Down
10 changes: 10 additions & 0 deletions examples/carbon-relay-ng.ini
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ init = [
'addRoute sendFirstMatch analytics regex=(Err/s|wait_time|logger) graphite.prod:2003 prefix=prod. spool=true pickle=true graphite.staging:2003 prefix=staging. spool=true pickle=true'
]

# consume metrics via AMQP
[amqp]
amqp_enabled = false
amqp_host = "localhost"
amqp_port = 5672
amqp_user = "guest"
amqp_password = "guest"
amqp_vhost = "/graphite"
amqp_exchange = "metrics"

[instrumentation]
# in addition to serving internal metrics via expvar, you can optionally send em to graphite
graphite_addr = "" # localhost:2003 (how about feeding back into the relay itself? :)
Expand Down
138 changes: 138 additions & 0 deletions input/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package input

import (
"time"

"github.com/graphite-ng/carbon-relay-ng/badmetrics"
"github.com/graphite-ng/carbon-relay-ng/cfg"
"github.com/graphite-ng/carbon-relay-ng/table"
"github.com/graphite-ng/carbon-relay-ng/validate"
"github.com/jpillora/backoff"
m20 "github.com/metrics20/go-metrics20/carbon20"
"github.com/streadway/amqp"
)

type Amqp struct {
uri amqp.URI
conn *amqp.Connection
channel *amqp.Channel

config cfg.Config
bad *badmetrics.BadMetrics
table *table.Table
}

func (a *Amqp) close() {
a.channel.Close()
a.conn.Close()
}

func StartAMQP(config cfg.Config, tbl *table.Table, bad *badmetrics.BadMetrics) {
uri := amqp.URI{
Scheme: "amqp",
Host: config.Amqp.Amqp_host,
Port: config.Amqp.Amqp_port,
Username: config.Amqp.Amqp_user,
Password: config.Amqp.Amqp_password,
Vhost: config.Amqp.Amqp_vhost,
}

a := &Amqp{
uri: uri,
config: config,
bad: bad,
table: tbl,
}

b := &backoff.Backoff{
Min: 500 * time.Millisecond,
}
for {
c, err := connectAMQP(a)
if err != nil {
// failed to connect; backoff and try again
log.Error("connectAMQP: %v", err)

d := b.Duration()
log.Info("retrying in %v", d)
time.Sleep(d)
} else {
// connected successfully; reset backoff
b.Reset()

// blocks until channel is closed
consumeAMQP(a, c)
log.Notice("consumeAMQP: channel closed")

// reconnect immediately
a.close()
}
}
}

func connectAMQP(a *Amqp) (<-chan amqp.Delivery, error) {
log.Notice("dialing AMQP: %v", a.uri)
conn, err := amqp.Dial(a.uri.String())
if err != nil {
return nil, err
}
a.conn = conn

amqpChan, err := conn.Channel()
if err != nil {
a.conn.Close()
return nil, err
}
a.channel = amqpChan

// queue name will be random, as in the python implementation
q, err := amqpChan.QueueDeclare("", false, false, true, false, nil)
if err != nil {
a.close()
return nil, err
}

err = amqpChan.QueueBind(q.Name, "#", a.config.Amqp.Amqp_exchange, false, nil)
if err != nil {
a.close()
return nil, err
}

c, err := amqpChan.Consume(q.Name, "carbon-relay-ng", true, true, true, false, nil)
if err != nil {
a.close()
return nil, err
}

return c, nil
}

func consumeAMQP(a *Amqp, c <-chan amqp.Delivery) {
log.Notice("consuming AMQP messages")
for m := range c {
a.dispatch(m.Body)
}
}

func (a *Amqp) dispatch(buf []byte) {
numIn.Inc(1)
log.Debug("dispatching message: %s", buf)

key, _, ts, err := m20.ValidatePacket(buf, a.config.Validation_level_legacy.Level, a.config.Validation_level_m20.Level)
if err != nil {
a.bad.Add(key, buf, err)
numInvalid.Inc(1)
return
}

if a.config.Validate_order {
err = validate.Ordered(key, ts)
if err != nil {
a.bad.Add(key, buf, err)
numOutOfOrder.Inc(1)
return
}
}

a.table.Dispatch(buf)
}
35 changes: 35 additions & 0 deletions vendor/github.com/streadway/amqp/CONTRIBUTING.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/github.com/streadway/amqp/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions vendor/github.com/streadway/amqp/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dd19f4e

Please sign in to comment.