-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib: skeleton app structure & Dogstatsd nsqd addon #909
base: master
Are you sure you want to change the base?
Changes from 6 commits
a399da4
1774d8b
e8af355
158658f
2575596
cd6d865
22b8e72
569ad0e
9e67b56
13303ad
bcbe6da
86b73d0
7f8d6d2
852552c
d727112
f59a2e5
66de333
e176b8a
9b72e38
6f9b481
059888b
5e2012a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package contrib | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"net" | ||
"strings" | ||
"time" | ||
) | ||
|
||
type DataDogClient struct { | ||
conn net.Conn | ||
addr string | ||
prefix string | ||
} | ||
|
||
type DataDogTags struct { | ||
tags map[string]string | ||
} | ||
|
||
// returns dogstatd compatible string | ||
// "#tag1:value1,tag2:value2 | ||
func (ddt *DataDogTags) String() string { | ||
ts := []string{} | ||
for k, v := range ddt.tags { | ||
ts = append(ts, fmt.Sprintf("%s:%s", k, v)) | ||
} | ||
return "#" + strings.Join(ts, ",") | ||
} | ||
|
||
func NewDataDogClient(addr string, prefix string) *DataDogClient { | ||
return &DataDogClient{ | ||
addr: addr, | ||
prefix: prefix, | ||
} | ||
} | ||
|
||
func (c *DataDogClient) String() string { | ||
return c.addr | ||
} | ||
|
||
func (c *DataDogClient) CreateSocket() error { | ||
conn, err := net.DialTimeout("udp", c.addr, time.Second) | ||
if err != nil { | ||
return err | ||
} | ||
c.conn = conn | ||
return nil | ||
} | ||
|
||
func (c *DataDogClient) Close() error { | ||
return c.conn.Close() | ||
} | ||
|
||
func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error { | ||
return c.send(stat, "%d|c", count, tags) | ||
} | ||
|
||
func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error { | ||
return c.send(stat, "%d|c", -count, tags) | ||
} | ||
|
||
func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error { | ||
return c.send(stat, "%d|ms", delta, tags) | ||
} | ||
|
||
func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error { | ||
return c.send(stat, "%d|g", value, tags) | ||
} | ||
|
||
func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error { | ||
if c.conn == nil { | ||
return errors.New("not connected") | ||
} | ||
format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String()) | ||
_, err := fmt.Fprintf(c.conn, format, value) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
package contrib |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package contrib | ||
|
||
import ( | ||
"fmt" | ||
"github.com/nsqio/nsq/nsqd" | ||
"time" | ||
) | ||
|
||
type NSQDDogStatsdOptions struct { | ||
DogStatsdAddress string `flag:"dogstatsd-address"` | ||
DogStatsdPrefix string `flag:"dogstatsd-prefix"` | ||
DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` | ||
} | ||
|
||
func NewNSQDDogStatsdDefaultOptions() *NSQDDogStatsdOptions { | ||
return &NSQDDogStatsdOptions{ | ||
DogStatsdPrefix: "nsq.", | ||
DogStatsdInterval: 10 * time.Second, | ||
} | ||
} | ||
|
||
type NSQDDogStatsd struct { | ||
nsqd *nsqd.NSQD | ||
contribOpts *NSQDContribOptions | ||
} | ||
|
||
func (dd *NSQDDogStatsd) Active() bool { | ||
if dd.contribOpts.DogStatsdAddress != "" { | ||
return true | ||
} else { | ||
return false | ||
} | ||
} | ||
|
||
func (dd *NSQDDogStatsd) Start() { | ||
logger.Println("Starting nsqd datadog") | ||
|
||
dd.nsqd.RegisterAddon(dd.Loop) | ||
} | ||
|
||
func (dd *NSQDDogStatsd) Loop() { | ||
// var lastMemStats *nsqd.memStats | ||
var lastStats []nsqd.TopicStats | ||
var stat string | ||
|
||
ticker := time.NewTicker(dd.contribOpts.DogStatsdInterval) | ||
|
||
dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") | ||
exitChan := *dd.nsqd.ExitChan() | ||
|
||
for { | ||
select { | ||
case <-exitChan: | ||
goto exit | ||
case <-ticker.C: | ||
dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") | ||
|
||
client := NewDataDogClient( | ||
dd.contribOpts.DogStatsdAddress, | ||
dd.contribOpts.DogStatsdPrefix, | ||
) | ||
err := client.CreateSocket() | ||
if err != nil { | ||
dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) | ||
continue | ||
} | ||
|
||
dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) | ||
|
||
stats := dd.nsqd.GetStats() | ||
for _, topic := range stats { | ||
// try to find the topic in the last collection | ||
lastTopic := nsqd.TopicStats{} | ||
for _, checkTopic := range lastStats { | ||
if topic.TopicName == checkTopic.TopicName { | ||
lastTopic = checkTopic | ||
break | ||
} | ||
} | ||
diff := topic.MessageCount - lastTopic.MessageCount | ||
|
||
client.Incr("message_count", int64(diff), &DataDogTags{ | ||
tags: map[string]string{"topic_name": topic.TopicName}, | ||
}) | ||
|
||
client.Gauge("topic.depth", topic.Depth, &DataDogTags{ | ||
tags: map[string]string{"topic_name": topic.TopicName}, | ||
}) | ||
|
||
client.Gauge("topic.backend_depth", topic.BackendDepth, &DataDogTags{ | ||
tags: map[string]string{"topic_name": topic.TopicName}, | ||
}) | ||
|
||
for _, item := range topic.E2eProcessingLatency.Percentiles { | ||
stat = fmt.Sprintf("topic.e2e_processing_latency_%.0f", item["quantile"]*100.0) | ||
// We can cast the value to int64 since a value of 1 is the | ||
// minimum resolution we will have, so there is no loss of | ||
// accuracy | ||
client.Gauge(stat, int64(item["value"]), &DataDogTags{ | ||
tags: map[string]string{"topic_name": topic.TopicName}, | ||
}) | ||
} | ||
|
||
for _, channel := range topic.Channels { | ||
// try to find the channel in the last collection | ||
lastChannel := nsqd.ChannelStats{} | ||
for _, checkChannel := range lastTopic.Channels { | ||
if channel.ChannelName == checkChannel.ChannelName { | ||
lastChannel = checkChannel | ||
break | ||
} | ||
} | ||
diff := channel.MessageCount - lastChannel.MessageCount | ||
client.Incr("channel.message_count", int64(diff), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
client.Gauge("channel.depth", channel.Depth, &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
client.Gauge("channel.backend_depth", channel.BackendDepth, &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
// stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName) | ||
client.Gauge("channel.in_flight_count", int64(channel.InFlightCount), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
// stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName) | ||
client.Gauge("channel.deferred_count", int64(channel.DeferredCount), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
diff = channel.RequeueCount - lastChannel.RequeueCount | ||
// stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName) | ||
client.Incr("channel.requeue_count", int64(diff), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
diff = channel.TimeoutCount - lastChannel.TimeoutCount | ||
// stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName) | ||
client.Incr("channel.timeout_count", int64(diff), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
// stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName) | ||
client.Gauge("channel.clients", int64(len(channel.Clients)), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
|
||
for _, item := range channel.E2eProcessingLatency.Percentiles { | ||
stat = fmt.Sprintf("channel.e2e_processing_latency_%.0f", item["quantile"]*100.0) | ||
client.Gauge(stat, int64(item["value"]), &DataDogTags{ | ||
tags: map[string]string{ | ||
"topic_name": topic.TopicName, | ||
"channel_name": channel.ChannelName, | ||
}, | ||
}) | ||
} | ||
} | ||
} | ||
lastStats = stats | ||
client.Close() | ||
} | ||
} | ||
|
||
exit: | ||
ticker.Stop() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package contrib | ||
|
||
import ( | ||
"flag" | ||
"github.com/nsqio/nsq/nsqd" | ||
"log" | ||
"os" | ||
"time" | ||
) | ||
|
||
var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) | ||
|
||
type INSQDAddon interface { | ||
Active() bool | ||
Start() | ||
} | ||
|
||
type NSQDContribOptions struct { | ||
*NSQDDogStatsdOptions | ||
} | ||
|
||
// Instantiates all contrib default options | ||
func NewContribOptions() *NSQDContribOptions { | ||
return &NSQDContribOptions{ | ||
NewNSQDDogStatsdDefaultOptions(), | ||
} | ||
} | ||
|
||
func AddNSQDContribFlags(opts *NSQDContribOptions, flagSet *flag.FlagSet) { | ||
flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats") | ||
flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd") | ||
// flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") | ||
flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") | ||
} | ||
|
||
type NSQDAddons struct { | ||
addons []INSQDAddon | ||
} | ||
|
||
// Starts all addons that are active | ||
func (as *NSQDAddons) Start() { | ||
logger.Println("Starting All addons") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should somehow go through |
||
|
||
for _, addon := range as.addons { | ||
if addon.Active() { | ||
addon.Start() | ||
} | ||
} | ||
} | ||
|
||
func NewNSQDAddons(contribOpts *NSQDContribOptions, nsqd *nsqd.NSQD) *NSQDAddons { | ||
return &NSQDAddons{ | ||
addons: []INSQDAddon{ | ||
&NSQDDogStatsd{ | ||
contribOpts: contribOpts, | ||
nsqd: nsqd, | ||
}, | ||
}, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,3 +18,9 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) { | |||||
opts := n.getOpts() | ||||||
lg.Logf(opts.Logger, opts.logLevel, level, f, args...) | ||||||
} | ||||||
|
||||||
// would like to expose logf to the contrib modules so that contrib can share the | ||||||
// configuration end user specifies on CLI | ||||||
func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A possibly cleaner alternative is to pass the Line 231 in 844c6a0
nsq/internal/protocol/tcp_server.go Line 15 in 844c6a0
|
||||||
n.logf(level, f, args...) | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
best not to add unrelated whitespace changes here