From a399da42dd87a0712086a3aafdfc0bc82392860f Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 11:38:31 -0400 Subject: [PATCH 01/22] contrib: skeleton app structure * contrib package with nsqd contrib * skeleton interfaces * skeleton datadogd contrib package * explicitly adds opts to nsqd --- apps/nsqd/nsqd.go | 12 ++++++++++++ contrib/dogstatsd.go | 19 +++++++++++++++++++ contrib/nsqd.go | 42 ++++++++++++++++++++++++++++++++++++++++++ nsqd/options.go | 9 +++++++++ 4 files changed, 82 insertions(+) create mode 100644 contrib/dogstatsd.go create mode 100644 contrib/nsqd.go diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 575bbdfd2..810f58425 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -19,6 +19,7 @@ import ( "github.com/nsqio/nsq/internal/app" "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqd" + "github.com/nsqio/nsq/contrib" ) type tlsRequiredOption int @@ -144,6 +145,13 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") + // contrib + // TODO cleanly extend this in the the contrib app + flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP : of a statsd daemon for pushing stats") + flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd") + // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") + flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") + return flagSet } @@ -232,6 +240,10 @@ func (p *program) Start() error { } nsqd.Main() + // hook into addons + addons := contrib.NewNSQDAddons() + addons.Start(opts, nsqd) + p.nsqd = nsqd return nil } diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go new file mode 100644 index 000000000..88fe0a1d2 --- /dev/null +++ b/contrib/dogstatsd.go @@ -0,0 +1,19 @@ +package contrib + +import "github.com/nsqio/nsq/nsqd" + + +type NSQDDogStatsd struct {} + + +func (dd *NSQDDogStatsd) Active(opts *nsqd.Options) bool { + if opts.DogStatsdAddress != "" { + return true + } else { + return false + } +} + +func (dd *NSQDDogStatsd) Start(n *nsqd.NSQD) { + logger.Println("Starting nsqd datadog") +} diff --git a/contrib/nsqd.go b/contrib/nsqd.go new file mode 100644 index 000000000..5f31c9d6b --- /dev/null +++ b/contrib/nsqd.go @@ -0,0 +1,42 @@ +package contrib + +import ( + "github.com/nsqio/nsq/nsqd" + "log" + "os" +) + + +var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) + + +type INSQDAddon interface { + Active(opts *nsqd.Options) bool + Start(*nsqd.NSQD) +} + + +type NSQDAddons struct { + addons []INSQDAddon +} + + +// Starts all addons that are active +func (as *NSQDAddons) Start(opts *nsqd.Options, n *nsqd.NSQD) { + logger.Println("Starting All addons") + + for _, addon := range as.addons { + if addon.Active(opts) { + addon.Start(n) + } + } +} + +func NewNSQDAddons() *NSQDAddons { + return &NSQDAddons{ + addons: []INSQDAddon{ + &NSQDDogStatsd{}, + }, + } +} + diff --git a/nsqd/options.go b/nsqd/options.go index ee461edb9..f00f523eb 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -79,6 +79,12 @@ type Options struct { DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` + + // contrib + // TODO cleanly extend this in the the contrib app + DogStatsdAddress string `flag:"dogstatsd-address"` + DogStatsdPrefix string `flag:"dogstatsd-prefix"` + DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` } func NewOptions() *Options { @@ -141,5 +147,8 @@ func NewOptions() *Options { SnappyEnabled: true, TLSMinVersion: tls.VersionTLS10, + + DogStatsdPrefix: "nsq.%s", + DogStatsdInterval: 60 * time.Second, } } From 1774d8b04cc51726106ba008302ae889986c91c6 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 12:27:10 -0400 Subject: [PATCH 02/22] contrib: * addons are instantiated with reference to nsqd * nsqd exposes exit channel reference * datadog contrib skeletal loop --- apps/nsqd/nsqd.go | 4 ++-- contrib/dogstatsd.go | 39 ++++++++++++++++++++++++++++++++++----- contrib/nsqd.go | 18 +++++++++++------- nsqd/nsqd.go | 8 ++++++++ nsqd/options.go | 2 +- 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 810f58425..db23c9725 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -241,8 +241,8 @@ func (p *program) Start() error { nsqd.Main() // hook into addons - addons := contrib.NewNSQDAddons() - addons.Start(opts, nsqd) + addons := contrib.NewNSQDAddons(opts, nsqd) + addons.Start() p.nsqd = nsqd return nil diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 88fe0a1d2..7149a98dc 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -1,19 +1,48 @@ package contrib -import "github.com/nsqio/nsq/nsqd" +import ( + "github.com/nsqio/nsq/nsqd" + "time" +) -type NSQDDogStatsd struct {} +type NSQDDogStatsd struct { + opts *nsqd.Options + nsqd *nsqd.NSQD +} -func (dd *NSQDDogStatsd) Active(opts *nsqd.Options) bool { - if opts.DogStatsdAddress != "" { +func (dd *NSQDDogStatsd) Active() bool { + if dd.opts.DogStatsdAddress != "" { return true } else { return false } } -func (dd *NSQDDogStatsd) Start(n *nsqd.NSQD) { +func (dd *NSQDDogStatsd) Start() { logger.Println("Starting nsqd datadog") + + dd.nsqd.RegisterAddon(dd.Loop) +} + +func (dd *NSQDDogStatsd) Loop() { + ticker := time.NewTicker(dd.opts.DogStatsdInterval) + logger.Println("Loop started") + exitChan := *dd.nsqd.ExitChan() + + for { + select { + case <- exitChan: + goto exit + case <- ticker.C: + logger.Println("LOOPING") + } + } + +exit: + ticker.Stop() } + + + diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 5f31c9d6b..f541e0ba7 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -11,8 +11,8 @@ var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) type INSQDAddon interface { - Active(opts *nsqd.Options) bool - Start(*nsqd.NSQD) + Active() bool + Start() } @@ -22,20 +22,24 @@ type NSQDAddons struct { // Starts all addons that are active -func (as *NSQDAddons) Start(opts *nsqd.Options, n *nsqd.NSQD) { +func (as *NSQDAddons) Start() { logger.Println("Starting All addons") for _, addon := range as.addons { - if addon.Active(opts) { - addon.Start(n) + if addon.Active() { + addon.Start() } } } -func NewNSQDAddons() *NSQDAddons { + +func NewNSQDAddons(opts *nsqd.Options, nsqd *nsqd.NSQD) *NSQDAddons { return &NSQDAddons{ addons: []INSQDAddon{ - &NSQDDogStatsd{}, + &NSQDDogStatsd{ + opts: opts, + nsqd: nsqd, + }, }, } } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 1649d5d25..7a16e8d86 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -154,6 +154,10 @@ func New(opts *Options) *NSQD { return n } +func (n *NSQD) ExitChan() *chan int { + return &n.exitChan +} + func (n *NSQD) getOpts() *Options { return n.opts.Load().(*Options) } @@ -265,6 +269,10 @@ func (n *NSQD) Main() { } } +func (n *NSQD) RegisterAddon(addonFn func()) { + n.waitGroup.Wrap(func() { addonFn() }) +} + type meta struct { Topics []struct { Name string `json:"name"` diff --git a/nsqd/options.go b/nsqd/options.go index f00f523eb..fc19a508d 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -149,6 +149,6 @@ func NewOptions() *Options { TLSMinVersion: tls.VersionTLS10, DogStatsdPrefix: "nsq.%s", - DogStatsdInterval: 60 * time.Second, + DogStatsdInterval: 10 * time.Second, } } From e8af355447b4065020df2ce713edc4fb23615ae5 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 12:47:47 -0400 Subject: [PATCH 03/22] contrib: nsqd exposes its logger for nsqd addonons --- contrib/dogstatsd.go | 4 ++-- nsqd/logger.go | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 7149a98dc..50fb979fa 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -28,7 +28,7 @@ func (dd *NSQDDogStatsd) Start() { func (dd *NSQDDogStatsd) Loop() { ticker := time.NewTicker(dd.opts.DogStatsdInterval) - logger.Println("Loop started") + dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") exitChan := *dd.nsqd.ExitChan() for { @@ -36,7 +36,7 @@ func (dd *NSQDDogStatsd) Loop() { case <- exitChan: goto exit case <- ticker.C: - logger.Println("LOOPING") + dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") } } diff --git a/nsqd/logger.go b/nsqd/logger.go index 51ac28db7..e74d5f05d 100644 --- a/nsqd/logger.go +++ b/nsqd/logger.go @@ -18,3 +18,10 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) { opts := n.getOpts() lg.Logf(opts.Logger, opts.logLevel, level, f, args...) } + +// would like to expose logf to the contrib modules so that contrib can share the +// configuration end user specifies on CLI +func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { + n.logf(level, f, args...) +} + From 158658fa4fedd16294edd251db5f9ead681b3f1c Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 13:32:14 -0400 Subject: [PATCH 04/22] contrib: flags and opts are all in contrib package now --- apps/nsqd/nsqd.go | 18 ++++++++++-------- contrib/dogstatsd.go | 14 +++++++++++--- contrib/nsqd.go | 28 +++++++++++++++++++++++++--- nsqd/options.go | 9 --------- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index db23c9725..fdd606907 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -145,13 +145,6 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") - // contrib - // TODO cleanly extend this in the the contrib app - flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP : of a statsd daemon for pushing stats") - flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd") - // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") - flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") - return flagSet } @@ -208,6 +201,13 @@ func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) + + // add the contrib options to these + contribOpts := contrib.NewContribOptions() + // add the flags for each of contrib + contrib.AddNSQDContribFlags(contribOpts, flagSet) + + // add the contrib flags to these flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) @@ -228,6 +228,8 @@ func (p *program) Start() error { cfg.Validate() options.Resolve(opts, flagSet, cfg) + options.Resolve(contribOpts, flagSet, cfg) + nsqd := nsqd.New(opts) err := nsqd.LoadMetadata() @@ -241,7 +243,7 @@ func (p *program) Start() error { nsqd.Main() // hook into addons - addons := contrib.NewNSQDAddons(opts, nsqd) + addons := contrib.NewNSQDAddons(contribOpts, nsqd) addons.Start() p.nsqd = nsqd diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 50fb979fa..0c9aadedf 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -6,14 +6,22 @@ import ( ) +type NSQDDogStatsdOptions struct { + DogStatsdAddress string `flag:"dogstatsd-address"` + DogStatsdPrefix string `flag:"dogstatsd-prefix"` + DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` +} + + type NSQDDogStatsd struct { - opts *nsqd.Options nsqd *nsqd.NSQD + contribOpts *NSQDContribOptions } func (dd *NSQDDogStatsd) Active() bool { - if dd.opts.DogStatsdAddress != "" { + dd.nsqd.Logf(nsqd.LOG_ERROR, "%s", dd.contribOpts) + if dd.contribOpts.DogStatsdAddress != "" { return true } else { return false @@ -27,7 +35,7 @@ func (dd *NSQDDogStatsd) Start() { } func (dd *NSQDDogStatsd) Loop() { - ticker := time.NewTicker(dd.opts.DogStatsdInterval) + ticker := time.NewTicker(dd.contribOpts.DogStatsdInterval) dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") exitChan := *dd.nsqd.ExitChan() diff --git a/contrib/nsqd.go b/contrib/nsqd.go index f541e0ba7..d12117be3 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -4,6 +4,8 @@ import ( "github.com/nsqio/nsq/nsqd" "log" "os" + "flag" + "time" ) @@ -15,12 +17,32 @@ type INSQDAddon interface { Start() } +type NSQDContribOptions struct { + *NSQDDogStatsdOptions +} + +// Instantiates all contrib default options +func NewContribOptions() *NSQDContribOptions { + return &NSQDContribOptions{ + &NSQDDogStatsdOptions{ + DogStatsdPrefix: "nsq.%s", + DogStatsdInterval: 10 * time.Second, + }, + } +} + +func AddNSQDContribFlags(opts *NSQDContribOptions, flagSet *flag.FlagSet) { + flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP : of a statsd daemon for pushing stats") + flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd") + // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") + flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") +} + type NSQDAddons struct { addons []INSQDAddon } - // Starts all addons that are active func (as *NSQDAddons) Start() { logger.Println("Starting All addons") @@ -33,11 +55,11 @@ func (as *NSQDAddons) Start() { } -func NewNSQDAddons(opts *nsqd.Options, nsqd *nsqd.NSQD) *NSQDAddons { +func NewNSQDAddons(contribOpts *NSQDContribOptions, nsqd *nsqd.NSQD) *NSQDAddons { return &NSQDAddons{ addons: []INSQDAddon{ &NSQDDogStatsd{ - opts: opts, + contribOpts: contribOpts, nsqd: nsqd, }, }, diff --git a/nsqd/options.go b/nsqd/options.go index fc19a508d..ee461edb9 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -79,12 +79,6 @@ type Options struct { DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` - - // contrib - // TODO cleanly extend this in the the contrib app - DogStatsdAddress string `flag:"dogstatsd-address"` - DogStatsdPrefix string `flag:"dogstatsd-prefix"` - DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` } func NewOptions() *Options { @@ -147,8 +141,5 @@ func NewOptions() *Options { SnappyEnabled: true, TLSMinVersion: tls.VersionTLS10, - - DogStatsdPrefix: "nsq.%s", - DogStatsdInterval: 10 * time.Second, } } From 257559685934566c09bb06fe903be3ed99653f84 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 15:35:12 -0400 Subject: [PATCH 05/22] contrib: datadog metrics * datadog client in contrib * statsd topic/channel stats --- contrib/datadog_client.go | 78 +++++++++++++++++ contrib/datadog_client_test.go | 1 + contrib/dogstatsd.go | 152 ++++++++++++++++++++++++++++++--- contrib/nsqd.go | 11 +-- nsqd/logger.go | 1 - nsqd/nsqd.go | 2 +- 6 files changed, 225 insertions(+), 20 deletions(-) create mode 100644 contrib/datadog_client.go create mode 100644 contrib/datadog_client_test.go diff --git a/contrib/datadog_client.go b/contrib/datadog_client.go new file mode 100644 index 000000000..bd1446c99 --- /dev/null +++ b/contrib/datadog_client.go @@ -0,0 +1,78 @@ +package contrib + +import ( + "errors" + "fmt" + "net" + "strings" + "time" +) + +type DataDogClient struct { + conn net.Conn + addr string + prefix string +} + +type DataDogTags struct { + tags map[string]string +} + +// returns dogstatd compatible string +// "#tag1:value1,tag2:value2 +func (ddt *DataDogTags) String() string { + ts := []string{} + for k, v := range ddt.tags { + ts = append(ts, fmt.Sprintf("%s:%s", k, v)) + } + return "#" + strings.Join(ts, ",") +} + +func NewDataDogClient(addr string, prefix string) *DataDogClient { + return &DataDogClient{ + addr: addr, + prefix: prefix, + } +} + +func (c *DataDogClient) String() string { + return c.addr +} + +func (c *DataDogClient) CreateSocket() error { + conn, err := net.DialTimeout("udp", c.addr, time.Second) + if err != nil { + return err + } + c.conn = conn + return nil +} + +func (c *DataDogClient) Close() error { + return c.conn.Close() +} + +func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error { + return c.send(stat, "%d|c", count, tags) +} + +func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error { + return c.send(stat, "%d|c", -count, tags) +} + +func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error { + return c.send(stat, "%d|ms", delta, tags) +} + +func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error { + return c.send(stat, "%d|g", value, tags) +} + +func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error { + if c.conn == nil { + return errors.New("not connected") + } + format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String()) + _, err := fmt.Fprintf(c.conn, format, value) + return err +} diff --git a/contrib/datadog_client_test.go b/contrib/datadog_client_test.go new file mode 100644 index 000000000..9e5ee6c24 --- /dev/null +++ b/contrib/datadog_client_test.go @@ -0,0 +1 @@ +package contrib diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 0c9aadedf..b9e6c68db 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -1,26 +1,23 @@ package contrib import ( + "fmt" "github.com/nsqio/nsq/nsqd" "time" ) - type NSQDDogStatsdOptions struct { DogStatsdAddress string `flag:"dogstatsd-address"` DogStatsdPrefix string `flag:"dogstatsd-prefix"` DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` } - type NSQDDogStatsd struct { - nsqd *nsqd.NSQD + nsqd *nsqd.NSQD contribOpts *NSQDContribOptions } - func (dd *NSQDDogStatsd) Active() bool { - dd.nsqd.Logf(nsqd.LOG_ERROR, "%s", dd.contribOpts) if dd.contribOpts.DogStatsdAddress != "" { return true } else { @@ -35,22 +32,157 @@ func (dd *NSQDDogStatsd) Start() { } func (dd *NSQDDogStatsd) Loop() { + // var lastMemStats *nsqd.memStats + var lastStats []nsqd.TopicStats + var stat string + ticker := time.NewTicker(dd.contribOpts.DogStatsdInterval) + dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") exitChan := *dd.nsqd.ExitChan() for { select { - case <- exitChan: + case <-exitChan: goto exit - case <- ticker.C: + case <-ticker.C: dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") + + client := NewDataDogClient( + dd.contribOpts.DogStatsdAddress, + dd.contribOpts.DogStatsdPrefix, + ) + err := client.CreateSocket() + if err != nil { + dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) + continue + } + + dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) + + stats := dd.nsqd.GetStats() + for _, topic := range stats { + // try to find the topic in the last collection + lastTopic := nsqd.TopicStats{} + for _, checkTopic := range lastStats { + if topic.TopicName == checkTopic.TopicName { + lastTopic = checkTopic + break + } + } + diff := topic.MessageCount - lastTopic.MessageCount + + client.Incr("message_count", int64(diff), &DataDogTags{ + tags: map[string]string{"topic_name": topic.TopicName}, + }) + + client.Gauge("topic.depth", topic.Depth, &DataDogTags{ + tags: map[string]string{"topic_name": topic.TopicName}, + }) + + client.Gauge("topic.backend_depth", topic.BackendDepth, &DataDogTags{ + tags: map[string]string{"topic_name": topic.TopicName}, + }) + + for _, item := range topic.E2eProcessingLatency.Percentiles { + stat = fmt.Sprintf("topic.e2e_processing_latency_%.0f", item["quantile"]*100.0) + // We can cast the value to int64 since a value of 1 is the + // minimum resolution we will have, so there is no loss of + // accuracy + client.Gauge(stat, int64(item["value"]), &DataDogTags{ + tags: map[string]string{"topic_name": topic.TopicName}, + }) + } + + for _, channel := range topic.Channels { + // try to find the channel in the last collection + lastChannel := nsqd.ChannelStats{} + for _, checkChannel := range lastTopic.Channels { + if channel.ChannelName == checkChannel.ChannelName { + lastChannel = checkChannel + break + } + } + diff := channel.MessageCount - lastChannel.MessageCount + client.Incr("channel.message_count", int64(diff), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + client.Gauge("channel.depth", channel.Depth, &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + client.Gauge("channel.backend_depth", channel.BackendDepth, &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName) + client.Gauge("channel.in_flight_count", int64(channel.InFlightCount), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName) + client.Gauge("channel.deferred_count", int64(channel.DeferredCount), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + diff = channel.RequeueCount - lastChannel.RequeueCount + // stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName) + client.Incr("channel.requeue_count", int64(diff), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + diff = channel.TimeoutCount - lastChannel.TimeoutCount + // stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName) + client.Incr("channel.timeout_count", int64(diff), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + // stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName) + client.Gauge("channel.clients", int64(len(channel.Clients)), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + + for _, item := range channel.E2eProcessingLatency.Percentiles { + stat = fmt.Sprintf("channel.e2e_processing_latency_%.0f", item["quantile"]*100.0) + client.Gauge(stat, int64(item["value"]), &DataDogTags{ + tags: map[string]string{ + "topic_name": topic.TopicName, + "channel_name": channel.ChannelName, + }, + }) + } + } + } + lastStats = stats + client.Close() } } exit: ticker.Stop() } - - - diff --git a/contrib/nsqd.go b/contrib/nsqd.go index d12117be3..101697cd3 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -1,17 +1,15 @@ package contrib import ( + "flag" "github.com/nsqio/nsq/nsqd" "log" "os" - "flag" "time" ) - var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) - type INSQDAddon interface { Active() bool Start() @@ -25,7 +23,7 @@ type NSQDContribOptions struct { func NewContribOptions() *NSQDContribOptions { return &NSQDContribOptions{ &NSQDDogStatsdOptions{ - DogStatsdPrefix: "nsq.%s", + DogStatsdPrefix: "nsq.", DogStatsdInterval: 10 * time.Second, }, } @@ -38,7 +36,6 @@ func AddNSQDContribFlags(opts *NSQDContribOptions, flagSet *flag.FlagSet) { flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") } - type NSQDAddons struct { addons []INSQDAddon } @@ -54,15 +51,13 @@ func (as *NSQDAddons) Start() { } } - func NewNSQDAddons(contribOpts *NSQDContribOptions, nsqd *nsqd.NSQD) *NSQDAddons { return &NSQDAddons{ addons: []INSQDAddon{ &NSQDDogStatsd{ contribOpts: contribOpts, - nsqd: nsqd, + nsqd: nsqd, }, }, } } - diff --git a/nsqd/logger.go b/nsqd/logger.go index e74d5f05d..a2b161695 100644 --- a/nsqd/logger.go +++ b/nsqd/logger.go @@ -24,4 +24,3 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) { func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { n.logf(level, f, args...) } - diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 7a16e8d86..fe86757b0 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -270,7 +270,7 @@ func (n *NSQD) Main() { } func (n *NSQD) RegisterAddon(addonFn func()) { - n.waitGroup.Wrap(func() { addonFn() }) + n.waitGroup.Wrap(func() { addonFn() }) } type meta struct { From cd6d865dfce4e7f7607919cefc3076cae92fcf22 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 19 Jun 2017 16:23:44 -0400 Subject: [PATCH 06/22] contrib: dogstatsd defines default options instead of contrib/nsqd.go --- contrib/dogstatsd.go | 7 +++++++ contrib/nsqd.go | 5 +---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index b9e6c68db..ba783f7f7 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -12,6 +12,13 @@ type NSQDDogStatsdOptions struct { DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` } +func NewNSQDDogStatsdDefaultOptions() *NSQDDogStatsdOptions { + return &NSQDDogStatsdOptions{ + DogStatsdPrefix: "nsq.", + DogStatsdInterval: 10 * time.Second, + } +} + type NSQDDogStatsd struct { nsqd *nsqd.NSQD contribOpts *NSQDContribOptions diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 101697cd3..7ff0d2025 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -22,10 +22,7 @@ type NSQDContribOptions struct { // Instantiates all contrib default options func NewContribOptions() *NSQDContribOptions { return &NSQDContribOptions{ - &NSQDDogStatsdOptions{ - DogStatsdPrefix: "nsq.", - DogStatsdInterval: 10 * time.Second, - }, + NewNSQDDogStatsdDefaultOptions(), } } From 22b8e72835cdcd11500e313ca73999662c9e2008 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 12:35:46 -0400 Subject: [PATCH 07/22] nsq contrib: * exposes single new option on nsqd * pushes option handling down to each individual optional module --- apps/nsqd/nsqd.go | 13 ++++---- contrib/dogstatsd.go | 70 +++++++++++++++++++++++++++++++++++--------- contrib/nsqd.go | 48 ++++++++++-------------------- nsqd/options.go | 5 ++++ 4 files changed, 82 insertions(+), 54 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index fdd606907..c5a17285c 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -145,6 +145,10 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") + + optModulesOptions := app.StringArray{} + flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}") + return flagSet } @@ -202,12 +206,6 @@ func (p *program) Start() error { flagSet := nsqdFlagSet(opts) - // add the contrib options to these - contribOpts := contrib.NewContribOptions() - // add the flags for each of contrib - contrib.AddNSQDContribFlags(contribOpts, flagSet) - - // add the contrib flags to these flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) @@ -228,7 +226,6 @@ func (p *program) Start() error { cfg.Validate() options.Resolve(opts, flagSet, cfg) - options.Resolve(contribOpts, flagSet, cfg) nsqd := nsqd.New(opts) @@ -243,7 +240,7 @@ func (p *program) Start() error { nsqd.Main() // hook into addons - addons := contrib.NewNSQDAddons(contribOpts, nsqd) + addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd) addons.Start() p.nsqd = nsqd diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index ba783f7f7..5b6da40ba 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/nsqio/nsq/nsqd" "time" + "flag" + "github.com/nsqio/nsq/internal/lg" ) type NSQDDogStatsdOptions struct { @@ -12,20 +14,60 @@ type NSQDDogStatsdOptions struct { DogStatsdInterval time.Duration `flag:"dogstatsd-interval"` } -func NewNSQDDogStatsdDefaultOptions() *NSQDDogStatsdOptions { - return &NSQDDogStatsdOptions{ - DogStatsdPrefix: "nsq.", - DogStatsdInterval: 10 * time.Second, +func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { + flagSet := flag.NewFlagSet("dogstatsd", flag.ExitOnError) + flagSet.StringVar( + &opts.DogStatsdAddress, + "dogstatsd-address", + "", + "UDP : of a statsd daemon for pushing stats", + ) + flagSet.DurationVar( + &opts.DogStatsdInterval, + "dogstatsd-interval", + 10 * time.Second, + "duration between pushing to dogstatsd", + ) + // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") + flagSet.StringVar( + &opts.DogStatsdPrefix, + "dogstatsd-prefix", + "nsq.", + "prefix used for keys sent to statsd (%s for host replacement)", + ) + return flagSet +} + +func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD) INSQDAddon { + n.Logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) + + dogStatsdOpts := &NSQDDogStatsdOptions{} + flagSet := NewNSQDDogStatsdContribFlags(dogStatsdOpts) + + flagSet.Parse(contribOpts) + n.Logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) + + // pass the dogstats specific opts on + return &NSQDDogStatsd{ + opts: dogStatsdOpts, + nsqd: n, } } type NSQDDogStatsd struct { nsqd *nsqd.NSQD - contribOpts *NSQDContribOptions + opts *NSQDDogStatsdOptions +} + + +// DD specific logger. Currently delegates to nsqd, but will change shortly... +func (dd *NSQDDogStatsd) logf(level lg.LogLevel, f string, args ...interface{}) { + dd.nsqd.Logf(level, f, args...) } -func (dd *NSQDDogStatsd) Active() bool { - if dd.contribOpts.DogStatsdAddress != "" { +func (dd *NSQDDogStatsd) Enabled() bool { + dd.logf(nsqd.LOG_INFO, "%+v", dd.opts) + if dd.opts.DogStatsdAddress != "" { return true } else { return false @@ -43,9 +85,9 @@ func (dd *NSQDDogStatsd) Loop() { var lastStats []nsqd.TopicStats var stat string - ticker := time.NewTicker(dd.contribOpts.DogStatsdInterval) + ticker := time.NewTicker(dd.opts.DogStatsdInterval) - dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") + dd.logf(nsqd.LOG_DEBUG, "Loop started") exitChan := *dd.nsqd.ExitChan() for { @@ -53,19 +95,19 @@ func (dd *NSQDDogStatsd) Loop() { case <-exitChan: goto exit case <-ticker.C: - dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") + dd.logf(nsqd.LOG_DEBUG, "LOOPING") client := NewDataDogClient( - dd.contribOpts.DogStatsdAddress, - dd.contribOpts.DogStatsdPrefix, + dd.opts.DogStatsdAddress, + dd.opts.DogStatsdPrefix, ) err := client.CreateSocket() if err != nil { - dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) + dd.logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) continue } - dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) + dd.logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) stats := dd.nsqd.GetStats() for _, topic := range stats { diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 7ff0d2025..6d14c9af6 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -1,60 +1,44 @@ package contrib import ( - "flag" "github.com/nsqio/nsq/nsqd" "log" "os" - "time" ) var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) type INSQDAddon interface { - Active() bool + Enabled() bool Start() } -type NSQDContribOptions struct { - *NSQDDogStatsdOptions -} - -// Instantiates all contrib default options -func NewContribOptions() *NSQDContribOptions { - return &NSQDContribOptions{ - NewNSQDDogStatsdDefaultOptions(), - } -} - -func AddNSQDContribFlags(opts *NSQDContribOptions, flagSet *flag.FlagSet) { - flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP : of a statsd daemon for pushing stats") - flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd") - // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") - flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)") -} - type NSQDAddons struct { addons []INSQDAddon } // Starts all addons that are active func (as *NSQDAddons) Start() { - logger.Println("Starting All addons") + logger.Println("Starting All Enabled Addons: %+v", as.addons) for _, addon := range as.addons { - if addon.Active() { - addon.Start() - } + addon.Start() } } -func NewNSQDAddons(contribOpts *NSQDContribOptions, nsqd *nsqd.NSQD) *NSQDAddons { +// Initializes addons that have options set +func NewEnabledNSQDAddons(contribOpts []string, nsqd *nsqd.NSQD) *NSQDAddons { + var activeAddons []INSQDAddon + + logger.Println(contribOpts) + + // ask each addon if it should be initialize + dogStats := NewNSQDDogStatsd(contribOpts, nsqd) + if dogStats.Enabled() { + activeAddons = append(activeAddons, dogStats) + } + return &NSQDAddons{ - addons: []INSQDAddon{ - &NSQDDogStatsd{ - contribOpts: contribOpts, - nsqd: nsqd, - }, - }, + addons: activeAddons, } } diff --git a/nsqd/options.go b/nsqd/options.go index ee461edb9..9157d810e 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -79,6 +79,9 @@ type Options struct { DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` + + // Contrib/Optional Modules + ModOpt []string `flag:"mod-opt"` } func NewOptions() *Options { @@ -141,5 +144,7 @@ func NewOptions() *Options { SnappyEnabled: true, TLSMinVersion: tls.VersionTLS10, + + ModOpt: make([]string, 0), } } From 569ad0e854f9cc7b4c42cab4c34a74cc0b33bc5f Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 13:35:08 -0400 Subject: [PATCH 08/22] nsq_contrib: begins to start using lg.AppLogFunc --- apps/nsqd/nsqd.go | 5 ++--- contrib/dogstatsd.go | 17 ++++++----------- contrib/nsqd.go | 13 ++++++------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index c5a17285c..f53195e22 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -16,10 +16,10 @@ import ( "github.com/BurntSushi/toml" "github.com/judwhite/go-svc/svc" "github.com/mreiferson/go-options" + "github.com/nsqio/nsq/contrib" "github.com/nsqio/nsq/internal/app" "github.com/nsqio/nsq/internal/version" "github.com/nsqio/nsq/nsqd" - "github.com/nsqio/nsq/contrib" ) type tlsRequiredOption int @@ -145,7 +145,6 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)") flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)") - optModulesOptions := app.StringArray{} flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}") @@ -240,7 +239,7 @@ func (p *program) Start() error { nsqd.Main() // hook into addons - addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd) + addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd, nsqd.Logf) addons.Start() p.nsqd = nsqd diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 5b6da40ba..c16eb20d9 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -38,31 +38,27 @@ func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { return flagSet } -func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD) INSQDAddon { - n.Logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) +func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD, logf lg.AppLogFunc) INSQDAddon { + logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) dogStatsdOpts := &NSQDDogStatsdOptions{} flagSet := NewNSQDDogStatsdContribFlags(dogStatsdOpts) flagSet.Parse(contribOpts) - n.Logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) + logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) // pass the dogstats specific opts on return &NSQDDogStatsd{ opts: dogStatsdOpts, nsqd: n, + logf: logf, } } type NSQDDogStatsd struct { nsqd *nsqd.NSQD opts *NSQDDogStatsdOptions -} - - -// DD specific logger. Currently delegates to nsqd, but will change shortly... -func (dd *NSQDDogStatsd) logf(level lg.LogLevel, f string, args ...interface{}) { - dd.nsqd.Logf(level, f, args...) + logf lg.AppLogFunc } func (dd *NSQDDogStatsd) Enabled() bool { @@ -75,8 +71,7 @@ func (dd *NSQDDogStatsd) Enabled() bool { } func (dd *NSQDDogStatsd) Start() { - logger.Println("Starting nsqd datadog") - + dd.logf(nsqd.LOG_INFO, "Starting Datadog NSQD Monitor") dd.nsqd.RegisterAddon(dd.Loop) } diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 6d14c9af6..43e955fa6 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -2,11 +2,9 @@ package contrib import ( "github.com/nsqio/nsq/nsqd" - "log" - "os" + "github.com/nsqio/nsq/internal/lg" ) -var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) type INSQDAddon interface { Enabled() bool @@ -15,11 +13,11 @@ type INSQDAddon interface { type NSQDAddons struct { addons []INSQDAddon + logf lg.AppLogFunc } // Starts all addons that are active func (as *NSQDAddons) Start() { - logger.Println("Starting All Enabled Addons: %+v", as.addons) for _, addon := range as.addons { addon.Start() @@ -27,18 +25,19 @@ func (as *NSQDAddons) Start() { } // Initializes addons that have options set -func NewEnabledNSQDAddons(contribOpts []string, nsqd *nsqd.NSQD) *NSQDAddons { +func NewEnabledNSQDAddons(contribOpts []string, n *nsqd.NSQD, logf lg.AppLogFunc) *NSQDAddons { var activeAddons []INSQDAddon - logger.Println(contribOpts) + logf(nsqd.LOG_INFO, "Addons Initializing") // ask each addon if it should be initialize - dogStats := NewNSQDDogStatsd(contribOpts, nsqd) + dogStats := NewNSQDDogStatsd(contribOpts, n, logf) if dogStats.Enabled() { activeAddons = append(activeAddons, dogStats) } return &NSQDAddons{ addons: activeAddons, + logf: logf, } } From 9e67b56cae9e84544c4938a2a6a2cddae4448c78 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 14:53:56 -0400 Subject: [PATCH 09/22] nsq_contrib: datadog tags are an array for deterministic tests --- contrib/datadog_client.go | 11 ++++-- contrib/datadog_client_test.go | 38 ++++++++++++++++++ contrib/dogstatsd.go | 71 +++++++++++++++++++--------------- contrib/nsqd_test.go | 34 ++++++++++++++++ 4 files changed, 120 insertions(+), 34 deletions(-) create mode 100644 contrib/nsqd_test.go diff --git a/contrib/datadog_client.go b/contrib/datadog_client.go index bd1446c99..aca43db27 100644 --- a/contrib/datadog_client.go +++ b/contrib/datadog_client.go @@ -14,16 +14,21 @@ type DataDogClient struct { prefix string } +type DataDogTag struct { + k string + v string +} + type DataDogTags struct { - tags map[string]string + tags []*DataDogTag } // returns dogstatd compatible string // "#tag1:value1,tag2:value2 func (ddt *DataDogTags) String() string { ts := []string{} - for k, v := range ddt.tags { - ts = append(ts, fmt.Sprintf("%s:%s", k, v)) + for _, tag := range ddt.tags { + ts = append(ts, fmt.Sprintf("%s:%s", tag.k, tag.v)) } return "#" + strings.Join(ts, ",") } diff --git a/contrib/datadog_client_test.go b/contrib/datadog_client_test.go index 9e5ee6c24..da0c765ae 100644 --- a/contrib/datadog_client_test.go +++ b/contrib/datadog_client_test.go @@ -1 +1,39 @@ package contrib + +import ( + "testing" + "github.com/nsqio/nsq/internal/test" +) + +func TestDDTagsStringNoTags(t *testing.T) { + test.Equal( + t, + (&DataDogTags{}).String(), + "#", + ) +} + +func TestDDTagsStringSingleString(t *testing.T) { + test.Equal( + t, + (&DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: "test_topic"}, + }, + }).String(), + "#topic_name:test_topic", + ) +} + +func TestDDTagsStringMultipleStrings(t *testing.T) { + test.Equal( + t, + (&DataDogTags{ + tags: []*DataDogTag{ + {k: "topic_name", v: "test_topic"}, + {k: "channel_name", v: "test_channel"}, + }, + }).String(), + "#topic_name:test_topic,channel_name:test_channel", + ) +} diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index c16eb20d9..2356b4fde 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -116,16 +116,23 @@ func (dd *NSQDDogStatsd) Loop() { } diff := topic.MessageCount - lastTopic.MessageCount + // can topics/channels have commas in their names? client.Incr("message_count", int64(diff), &DataDogTags{ - tags: map[string]string{"topic_name": topic.TopicName}, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, }) client.Gauge("topic.depth", topic.Depth, &DataDogTags{ - tags: map[string]string{"topic_name": topic.TopicName}, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, }) client.Gauge("topic.backend_depth", topic.BackendDepth, &DataDogTags{ - tags: map[string]string{"topic_name": topic.TopicName}, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, }) for _, item := range topic.E2eProcessingLatency.Percentiles { @@ -134,7 +141,9 @@ func (dd *NSQDDogStatsd) Loop() { // minimum resolution we will have, so there is no loss of // accuracy client.Gauge(stat, int64(item["value"]), &DataDogTags{ - tags: map[string]string{"topic_name": topic.TopicName}, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + }, }) } @@ -149,74 +158,74 @@ func (dd *NSQDDogStatsd) Loop() { } diff := channel.MessageCount - lastChannel.MessageCount client.Incr("channel.message_count", int64(diff), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) client.Gauge("channel.depth", channel.Depth, &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) client.Gauge("channel.backend_depth", channel.BackendDepth, &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) // stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName) client.Gauge("channel.in_flight_count", int64(channel.InFlightCount), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) // stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName) client.Gauge("channel.deferred_count", int64(channel.DeferredCount), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) diff = channel.RequeueCount - lastChannel.RequeueCount // stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName) client.Incr("channel.requeue_count", int64(diff), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) diff = channel.TimeoutCount - lastChannel.TimeoutCount // stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName) client.Incr("channel.timeout_count", int64(diff), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) // stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName) client.Gauge("channel.clients", int64(len(channel.Clients)), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) for _, item := range channel.E2eProcessingLatency.Percentiles { stat = fmt.Sprintf("channel.e2e_processing_latency_%.0f", item["quantile"]*100.0) client.Gauge(stat, int64(item["value"]), &DataDogTags{ - tags: map[string]string{ - "topic_name": topic.TopicName, - "channel_name": channel.ChannelName, + tags: []*DataDogTag{ + {k: "topic_name", v: topic.TopicName}, + {k: "channel_name", v: channel.ChannelName}, }, }) } diff --git a/contrib/nsqd_test.go b/contrib/nsqd_test.go new file mode 100644 index 000000000..2c4c2558d --- /dev/null +++ b/contrib/nsqd_test.go @@ -0,0 +1,34 @@ +package contrib + +import ( + "testing" + "github.com/nsqio/nsq/internal/test" +) + + + +type TestAddon struct { + numStartCalls int +} + +func (ta *TestAddon) Start() { + ta.numStartCalls += 1 +} + +func (ta *TestAddon) Enabled() bool { + return true +} + + +func TestStartMultipleAddons(t *testing.T) { + ta1 := &TestAddon{} + ta2 := &TestAddon{} + + as := &NSQDAddons{ + addons: []INSQDAddon{ta1, ta2}, + } + as.Start() + + test.Equal(t, ta1.numStartCalls, 1) + test.Equal(t, ta2.numStartCalls, 1) +} \ No newline at end of file From 13303ad0f7cc7de9fe0db59272f0d41510ff7b0c Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 16:06:46 -0400 Subject: [PATCH 10/22] datadog_contrib: net.Conn test! for the send method --- contrib/datadog_client_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/contrib/datadog_client_test.go b/contrib/datadog_client_test.go index da0c765ae..4b8d51448 100644 --- a/contrib/datadog_client_test.go +++ b/contrib/datadog_client_test.go @@ -3,6 +3,7 @@ package contrib import ( "testing" "github.com/nsqio/nsq/internal/test" + "net" ) func TestDDTagsStringNoTags(t *testing.T) { @@ -37,3 +38,21 @@ func TestDDTagsStringMultipleStrings(t *testing.T) { "#topic_name:test_topic,channel_name:test_channel", ) } + +func TestDDCSend(t *testing.T) { + r, w := net.Pipe() + b := make([]byte, len("nsq.topic.depth:100|t|#")) + + go func() { + ddc := &DataDogClient{ + conn: w, + addr: "test", + prefix: "nsq.", + } + testValue := int64(100) + ddc.send("topic.depth", "%d|t", testValue, &DataDogTags{}) + }() + + r.Read(b) + test.Equal(t, string(b), "nsq.topic.depth:100|t|#") +} From bcbe6daf96f517863ffbb4686febd894a70130dd Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 16:40:08 -0400 Subject: [PATCH 11/22] contrib_dogstats: test file and Enabled() tests --- contrib/dogstatsd_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 contrib/dogstatsd_test.go diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go new file mode 100644 index 000000000..6ce4fc8d2 --- /dev/null +++ b/contrib/dogstatsd_test.go @@ -0,0 +1,34 @@ +package contrib + +import ( + "testing" + "github.com/nsqio/nsq/internal/test" + "log" + "github.com/nsqio/nsq/internal/lg" + "os" +) + +func logf(level lg.LogLevel, f string, args ...interface{}) { + logger := log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) + lg.Logf(logger, lg.DEBUG, level, f, args...) +} + +func TestEnabledTrueWhenAddressPresent(t *testing.T) { + + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{ + DogStatsdAddress: "test.com.org", + }, + logf: logf, + } + test.Equal(t, dd.Enabled(), true) +} + +func TestEnabledFalseWhenAddressAbsent(t *testing.T) { + + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{}, + logf: logf, + } + test.Equal(t, dd.Enabled(), false) +} From 86b73d08431d424c60dd4233ea99930c010ceacf Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 16:48:36 -0400 Subject: [PATCH 12/22] contrib_dogstats: * stubbed out more opts related tests * removed modified whitespace from nsqd.go --- apps/nsqd/nsqd.go | 2 -- contrib/dogstatsd_test.go | 11 +++++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index f53195e22..945871bdc 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -204,7 +204,6 @@ func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) - flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) @@ -225,7 +224,6 @@ func (p *program) Start() error { cfg.Validate() options.Resolve(opts, flagSet, cfg) - nsqd := nsqd.New(opts) err := nsqd.LoadMetadata() diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index 6ce4fc8d2..b5b478f27 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -32,3 +32,14 @@ func TestEnabledFalseWhenAddressAbsent(t *testing.T) { } test.Equal(t, dd.Enabled(), false) } + +func TestFlagsParsedSuccess(t *testing.T) { + t.Fail() +} + +// Tests that no opts are parsed when the - prefix is missing from the module +// opts. The - is required because the optional module opts list is passed directly +// back to flags.Parse() +func TestFlagsMissingDashPrefix(t *testing.T) { + t.Fail() +} From 7f8d6d2026df7f31285b4ec185c9fc8b39bbba6d Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 17 Jul 2017 17:14:35 -0400 Subject: [PATCH 13/22] contrib_dogstats: removed passing public Logf, and delegates to NSQD's --- contrib/dogstatsd.go | 21 ++++++++--------- contrib/dogstatsd_test.go | 47 ++++++++++++++++++++++----------------- contrib/nsqd.go | 9 +++----- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index 2356b4fde..f3eaf9747 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -5,7 +5,6 @@ import ( "github.com/nsqio/nsq/nsqd" "time" "flag" - "github.com/nsqio/nsq/internal/lg" ) type NSQDDogStatsdOptions struct { @@ -38,31 +37,29 @@ func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { return flagSet } -func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD, logf lg.AppLogFunc) INSQDAddon { - logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) +func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD) INSQDAddon { + n.Logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) dogStatsdOpts := &NSQDDogStatsdOptions{} flagSet := NewNSQDDogStatsdContribFlags(dogStatsdOpts) flagSet.Parse(contribOpts) - logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) + n.Logf(nsqd.LOG_INFO, "Parsed Options: %+v", dogStatsdOpts) // pass the dogstats specific opts on return &NSQDDogStatsd{ opts: dogStatsdOpts, nsqd: n, - logf: logf, } } type NSQDDogStatsd struct { nsqd *nsqd.NSQD opts *NSQDDogStatsdOptions - logf lg.AppLogFunc } func (dd *NSQDDogStatsd) Enabled() bool { - dd.logf(nsqd.LOG_INFO, "%+v", dd.opts) + dd.nsqd.Logf(nsqd.LOG_INFO, "%+v", dd.opts) if dd.opts.DogStatsdAddress != "" { return true } else { @@ -71,7 +68,7 @@ func (dd *NSQDDogStatsd) Enabled() bool { } func (dd *NSQDDogStatsd) Start() { - dd.logf(nsqd.LOG_INFO, "Starting Datadog NSQD Monitor") + dd.nsqd.Logf(nsqd.LOG_INFO, "Starting Datadog NSQD Monitor") dd.nsqd.RegisterAddon(dd.Loop) } @@ -82,7 +79,7 @@ func (dd *NSQDDogStatsd) Loop() { ticker := time.NewTicker(dd.opts.DogStatsdInterval) - dd.logf(nsqd.LOG_DEBUG, "Loop started") + dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") exitChan := *dd.nsqd.ExitChan() for { @@ -90,7 +87,7 @@ func (dd *NSQDDogStatsd) Loop() { case <-exitChan: goto exit case <-ticker.C: - dd.logf(nsqd.LOG_DEBUG, "LOOPING") + dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING") client := NewDataDogClient( dd.opts.DogStatsdAddress, @@ -98,11 +95,11 @@ func (dd *NSQDDogStatsd) Loop() { ) err := client.CreateSocket() if err != nil { - dd.logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) + dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client) continue } - dd.logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) + dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client) stats := dd.nsqd.GetStats() for _, topic := range stats { diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index b5b478f27..c989d47d4 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -3,34 +3,41 @@ package contrib import ( "testing" "github.com/nsqio/nsq/internal/test" - "log" - "github.com/nsqio/nsq/internal/lg" - "os" + "github.com/nsqio/nsq/nsqd" ) -func logf(level lg.LogLevel, f string, args ...interface{}) { - logger := log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds) - lg.Logf(logger, lg.DEBUG, level, f, args...) -} - func TestEnabledTrueWhenAddressPresent(t *testing.T) { + t.Fail() - dd := &NSQDDogStatsd{ - opts: &NSQDDogStatsdOptions{ - DogStatsdAddress: "test.com.org", - }, - logf: logf, - } - test.Equal(t, dd.Enabled(), true) + //n := nsqd.New(&nsqd.Options{ + // LogLevel: "debug", + // MaxDeflateLevel: 1, + //}) + // + //dd := &NSQDDogStatsd{ + // opts: &NSQDDogStatsdOptions{ + // DogStatsdAddress: "test.com.org", + // }, + // nsqd: n, + //} + //test.Equal(t, dd.Enabled(), true) + // } func TestEnabledFalseWhenAddressAbsent(t *testing.T) { + t.Fail() - dd := &NSQDDogStatsd{ - opts: &NSQDDogStatsdOptions{}, - logf: logf, - } - test.Equal(t, dd.Enabled(), false) + //n := nsqd.New(&nsqd.Options{ + // LogLevel: "debug", + // MaxDeflateLevel: 1, + //}) + // + //dd := &NSQDDogStatsd{ + // opts: &NSQDDogStatsdOptions{}, + // nsqd: n, + //} + //test.Equal(t, dd.Enabled(), false) + // } func TestFlagsParsedSuccess(t *testing.T) { diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 43e955fa6..0f68e7f42 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -2,7 +2,6 @@ package contrib import ( "github.com/nsqio/nsq/nsqd" - "github.com/nsqio/nsq/internal/lg" ) @@ -13,7 +12,6 @@ type INSQDAddon interface { type NSQDAddons struct { addons []INSQDAddon - logf lg.AppLogFunc } // Starts all addons that are active @@ -25,19 +23,18 @@ func (as *NSQDAddons) Start() { } // Initializes addons that have options set -func NewEnabledNSQDAddons(contribOpts []string, n *nsqd.NSQD, logf lg.AppLogFunc) *NSQDAddons { +func NewEnabledNSQDAddons(contribOpts []string, n *nsqd.NSQD) *NSQDAddons { var activeAddons []INSQDAddon - logf(nsqd.LOG_INFO, "Addons Initializing") + n.Logf(nsqd.LOG_INFO, "Addons Initializing") // ask each addon if it should be initialize - dogStats := NewNSQDDogStatsd(contribOpts, n, logf) + dogStats := NewNSQDDogStatsd(contribOpts, n) if dogStats.Enabled() { activeAddons = append(activeAddons, dogStats) } return &NSQDAddons{ addons: activeAddons, - logf: logf, } } From 852552c40aa4eec0359320901dde3ac66a77e62a Mon Sep 17 00:00:00 2001 From: daniel mican Date: Sat, 22 Jul 2017 21:18:13 -0400 Subject: [PATCH 14/22] contrib_dogstatsd remove explicit logger function invocation of addons --- apps/nsqd/nsqd.go | 2 +- contrib/dogstatsd_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/nsqd/nsqd.go b/apps/nsqd/nsqd.go index 945871bdc..1b4fa66b2 100644 --- a/apps/nsqd/nsqd.go +++ b/apps/nsqd/nsqd.go @@ -237,7 +237,7 @@ func (p *program) Start() error { nsqd.Main() // hook into addons - addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd, nsqd.Logf) + addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd) addons.Start() p.nsqd = nsqd diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index c989d47d4..8609300f7 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -2,8 +2,8 @@ package contrib import ( "testing" - "github.com/nsqio/nsq/internal/test" - "github.com/nsqio/nsq/nsqd" + _"github.com/nsqio/nsq/internal/test" + _"github.com/nsqio/nsq/nsqd" ) func TestEnabledTrueWhenAddressPresent(t *testing.T) { From d7271129b7efaf4131670b8a0f94eb210de37bcd Mon Sep 17 00:00:00 2001 From: daniel mican Date: Fri, 28 Jul 2017 15:20:47 -0400 Subject: [PATCH 15/22] nsqd.RegisterAddon -> nsqd.AddModuleGoroutine --- contrib/dogstatsd.go | 5 ++--- nsqd/nsqd.go | 8 ++------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index f3eaf9747..f5d2b4405 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -69,10 +69,10 @@ func (dd *NSQDDogStatsd) Enabled() bool { func (dd *NSQDDogStatsd) Start() { dd.nsqd.Logf(nsqd.LOG_INFO, "Starting Datadog NSQD Monitor") - dd.nsqd.RegisterAddon(dd.Loop) + dd.nsqd.AddModuleGoroutine(dd.Loop) } -func (dd *NSQDDogStatsd) Loop() { +func (dd *NSQDDogStatsd) Loop(exitChan chan int) { // var lastMemStats *nsqd.memStats var lastStats []nsqd.TopicStats var stat string @@ -80,7 +80,6 @@ func (dd *NSQDDogStatsd) Loop() { ticker := time.NewTicker(dd.opts.DogStatsdInterval) dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started") - exitChan := *dd.nsqd.ExitChan() for { select { diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index fe86757b0..5ebbd9e29 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -154,10 +154,6 @@ func New(opts *Options) *NSQD { return n } -func (n *NSQD) ExitChan() *chan int { - return &n.exitChan -} - func (n *NSQD) getOpts() *Options { return n.opts.Load().(*Options) } @@ -269,8 +265,8 @@ func (n *NSQD) Main() { } } -func (n *NSQD) RegisterAddon(addonFn func()) { - n.waitGroup.Wrap(func() { addonFn() }) +func (n *NSQD) AddModuleGoroutine(addonFn func(exitChan chan int)) { + n.waitGroup.Wrap(func() { addonFn(n.exitChan) }) } type meta struct { From f59a2e5a84f37dc267dd042d90125bdad26a4031 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Fri, 28 Jul 2017 15:54:53 -0400 Subject: [PATCH 16/22] dogstatsd contrib boosting test coverage --- contrib/datadog_client_test.go | 6 ++-- contrib/dogstatsd.go | 10 +++--- contrib/dogstatsd_test.go | 64 ++++++++++++++++------------------ contrib/nsqd.go | 8 ++++- contrib/nsqd_test.go | 7 ++-- 5 files changed, 48 insertions(+), 47 deletions(-) diff --git a/contrib/datadog_client_test.go b/contrib/datadog_client_test.go index 4b8d51448..e7ad3b13a 100644 --- a/contrib/datadog_client_test.go +++ b/contrib/datadog_client_test.go @@ -1,9 +1,9 @@ package contrib import ( - "testing" "github.com/nsqio/nsq/internal/test" "net" + "testing" ) func TestDDTagsStringNoTags(t *testing.T) { @@ -45,8 +45,8 @@ func TestDDCSend(t *testing.T) { go func() { ddc := &DataDogClient{ - conn: w, - addr: "test", + conn: w, + addr: "test", prefix: "nsq.", } testValue := int64(100) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index f5d2b4405..e9cf6cb40 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -1,10 +1,10 @@ package contrib import ( + "flag" "fmt" "github.com/nsqio/nsq/nsqd" "time" - "flag" ) type NSQDDogStatsdOptions struct { @@ -24,7 +24,7 @@ func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { flagSet.DurationVar( &opts.DogStatsdInterval, "dogstatsd-interval", - 10 * time.Second, + 10*time.Second, "duration between pushing to dogstatsd", ) // flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd") @@ -37,7 +37,7 @@ func NewNSQDDogStatsdContribFlags(opts *NSQDDogStatsdOptions) *flag.FlagSet { return flagSet } -func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD) INSQDAddon { +func NewNSQDDogStatsd(contribOpts []string, n INSQD) INSQDAddon { n.Logf(nsqd.LOG_INFO, "Received options: %+v", contribOpts) dogStatsdOpts := &NSQDDogStatsdOptions{} @@ -49,12 +49,12 @@ func NewNSQDDogStatsd(contribOpts []string, n *nsqd.NSQD) INSQDAddon { // pass the dogstats specific opts on return &NSQDDogStatsd{ opts: dogStatsdOpts, - nsqd: n, + nsqd: n, } } type NSQDDogStatsd struct { - nsqd *nsqd.NSQD + nsqd INSQD opts *NSQDDogStatsdOptions } diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index 8609300f7..3f85d5599 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -1,52 +1,50 @@ -package contrib +package contrib import ( + "github.com/nsqio/nsq/internal/lg" + "github.com/nsqio/nsq/internal/test" + "github.com/nsqio/nsq/nsqd" "testing" - _"github.com/nsqio/nsq/internal/test" - _"github.com/nsqio/nsq/nsqd" ) +type StubNSQD struct{} + +func (n *StubNSQD) Logf(level lg.LogLevel, f string, args ...interface{}) {} +func (n *StubNSQD) GetStats() []nsqd.TopicStats { + return []nsqd.TopicStats{} +} +func (n *StubNSQD) AddModuleGoroutine(addonFn func(exitChan chan int)) {} + func TestEnabledTrueWhenAddressPresent(t *testing.T) { - t.Fail() - - //n := nsqd.New(&nsqd.Options{ - // LogLevel: "debug", - // MaxDeflateLevel: 1, - //}) - // - //dd := &NSQDDogStatsd{ - // opts: &NSQDDogStatsdOptions{ - // DogStatsdAddress: "test.com.org", - // }, - // nsqd: n, - //} - //test.Equal(t, dd.Enabled(), true) - // + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{ + DogStatsdAddress: "test.com.org", + }, + nsqd: &StubNSQD{}, + } + test.Equal(t, dd.Enabled(), true) + } func TestEnabledFalseWhenAddressAbsent(t *testing.T) { - t.Fail() - - //n := nsqd.New(&nsqd.Options{ - // LogLevel: "debug", - // MaxDeflateLevel: 1, - //}) - // - //dd := &NSQDDogStatsd{ - // opts: &NSQDDogStatsdOptions{}, - // nsqd: n, - //} - //test.Equal(t, dd.Enabled(), false) - // + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{}, + nsqd: &StubNSQD{}, + } + test.Equal(t, dd.Enabled(), false) } func TestFlagsParsedSuccess(t *testing.T) { - t.Fail() + opts := []string{"-dogstatsd-address", "127.0.0.1:8125"} + addon := NewNSQDDogStatsd(opts, &StubNSQD{}) + test.Equal(t, addon.(*NSQDDogStatsd).opts.DogStatsdAddress, "127.0.0.1:8125") } // Tests that no opts are parsed when the - prefix is missing from the module // opts. The - is required because the optional module opts list is passed directly // back to flags.Parse() func TestFlagsMissingDashPrefix(t *testing.T) { - t.Fail() + opts := []string{"dogstatsd-address", "127.0.0.1:8125"} + addon := NewNSQDDogStatsd(opts, &StubNSQD{}) + test.Equal(t, addon.(*NSQDDogStatsd).opts.DogStatsdAddress, "") } diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 0f68e7f42..702def216 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -1,9 +1,15 @@ package contrib import ( + "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/nsqd" ) +type INSQD interface { + Logf(level lg.LogLevel, f string, args ...interface{}) + GetStats() []nsqd.TopicStats + AddModuleGoroutine(addonFn func(exitChan chan int)) +} type INSQDAddon interface { Enabled() bool @@ -23,7 +29,7 @@ func (as *NSQDAddons) Start() { } // Initializes addons that have options set -func NewEnabledNSQDAddons(contribOpts []string, n *nsqd.NSQD) *NSQDAddons { +func NewEnabledNSQDAddons(contribOpts []string, n INSQD) *NSQDAddons { var activeAddons []INSQDAddon n.Logf(nsqd.LOG_INFO, "Addons Initializing") diff --git a/contrib/nsqd_test.go b/contrib/nsqd_test.go index 2c4c2558d..437f211ed 100644 --- a/contrib/nsqd_test.go +++ b/contrib/nsqd_test.go @@ -1,12 +1,10 @@ package contrib import ( - "testing" "github.com/nsqio/nsq/internal/test" + "testing" ) - - type TestAddon struct { numStartCalls int } @@ -19,7 +17,6 @@ func (ta *TestAddon) Enabled() bool { return true } - func TestStartMultipleAddons(t *testing.T) { ta1 := &TestAddon{} ta2 := &TestAddon{} @@ -31,4 +28,4 @@ func TestStartMultipleAddons(t *testing.T) { test.Equal(t, ta1.numStartCalls, 1) test.Equal(t, ta2.numStartCalls, 1) -} \ No newline at end of file +} From 66de333023fb0c748706aeecc333cac517990ae3 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Fri, 28 Jul 2017 16:12:35 -0400 Subject: [PATCH 17/22] dogstatsd contrib: more tests and readme --- contrib/README.md | 28 ++++++++++++++++++++++++++++ contrib/nsqd_test.go | 6 ++++++ 2 files changed, 34 insertions(+) create mode 100644 contrib/README.md diff --git a/contrib/README.md b/contrib/README.md new file mode 100644 index 000000000..884a9a03f --- /dev/null +++ b/contrib/README.md @@ -0,0 +1,28 @@ +## Optional/Contrib Modules + +Contrib modules are a way to add functionality to nsqd, in a decoupled way. + + +The modules currently available are: + +- Datadog + + +### Architecture + +Contrib modules are initialized by passing in `--mod-opt=` to nsqd. This may +be provided multiple times. An array of `mod-opt`s are then passed to the +contrib module initializer (during nsqd initialization). Each module is then +passed its options to see if valid options were provided, after which it is +initialized and added to the nsqd waitGroup. + + +### Datadog + +Datadog contrib module, reports nsqd statistics to a datadog daemon. The options +it exposes are: + +- `--mod-opt=-dogstatsd-address=` +- `--mod-opt=-dogstatsd-interval=` +- `--mod-opt=-dogstatsd-prefix=` + diff --git a/contrib/nsqd_test.go b/contrib/nsqd_test.go index 437f211ed..8c4ea61bf 100644 --- a/contrib/nsqd_test.go +++ b/contrib/nsqd_test.go @@ -29,3 +29,9 @@ func TestStartMultipleAddons(t *testing.T) { test.Equal(t, ta1.numStartCalls, 1) test.Equal(t, ta2.numStartCalls, 1) } + +func TestNewEnabledNSQDAddonsNoAddons(t *testing.T) { + var opts []string + addons := NewEnabledNSQDAddons(opts, &StubNSQD{}) + test.Equal(t, addons.addons, []INSQDAddon(nil)) +} From e176b8ace48b3c062863b14520ac7e2a618e1b7b Mon Sep 17 00:00:00 2001 From: daniel mican Date: Sat, 29 Jul 2017 22:43:50 -0400 Subject: [PATCH 18/22] dogstats contrib: main loop UDP integration test --- contrib/dogstatsd.go | 9 ++++- contrib/dogstatsd_test.go | 85 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/contrib/dogstatsd.go b/contrib/dogstatsd.go index e9cf6cb40..9146d3cb4 100644 --- a/contrib/dogstatsd.go +++ b/contrib/dogstatsd.go @@ -54,8 +54,9 @@ func NewNSQDDogStatsd(contribOpts []string, n INSQD) INSQDAddon { } type NSQDDogStatsd struct { - nsqd INSQD - opts *NSQDDogStatsdOptions + nsqd INSQD + opts *NSQDDogStatsdOptions + singleLoop bool } func (dd *NSQDDogStatsd) Enabled() bool { @@ -229,6 +230,10 @@ func (dd *NSQDDogStatsd) Loop(exitChan chan int) { } lastStats = stats client.Close() + + if dd.singleLoop { + goto exit + } } } diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index 3f85d5599..5bc5a8202 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -1,17 +1,35 @@ package contrib import ( + "bytes" + "fmt" "github.com/nsqio/nsq/internal/lg" + "github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/test" "github.com/nsqio/nsq/nsqd" + "net" "testing" + "time" ) type StubNSQD struct{} -func (n *StubNSQD) Logf(level lg.LogLevel, f string, args ...interface{}) {} +func (n *StubNSQD) Logf(level lg.LogLevel, f string, args ...interface{}) { + fmt.Printf(f, args) +} func (n *StubNSQD) GetStats() []nsqd.TopicStats { - return []nsqd.TopicStats{} + return []nsqd.TopicStats{ + { + TopicName: "test", + E2eProcessingLatency: &quantile.Result{}, + Channels: []nsqd.ChannelStats{ + { + ChannelName: "test_channel", + E2eProcessingLatency: &quantile.Result{}, + }, + }, + }, + } } func (n *StubNSQD) AddModuleGoroutine(addonFn func(exitChan chan int)) {} @@ -48,3 +66,66 @@ func TestFlagsMissingDashPrefix(t *testing.T) { addon := NewNSQDDogStatsd(opts, &StubNSQD{}) test.Equal(t, addon.(*NSQDDogStatsd).opts.DogStatsdAddress, "") } + +func TestLoopSendsCorrectMessages(t *testing.T) { + // setup the UDP Server + conn, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, + }) + if err != nil { + panic(err) + } + defer conn.Close() + + // Start the DD loop to send status updates to the test server addr + go func(addr net.Addr) { + exitChan := make(chan int) + dd := &NSQDDogStatsd{ + opts: &NSQDDogStatsdOptions{ + DogStatsdAddress: addr.String(), + DogStatsdInterval: time.Second, + }, + nsqd: &StubNSQD{}, + singleLoop: true, + } + + dd.Loop(exitChan) + + }(conn.LocalAddr()) + + conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + + // read from server conn and assert all data was sent from datadog + // as expected + // Makes X Specific reads + cases := []struct { + Name string + A string + }{ + {Name: "message_count", A: "message_count:0|c|#topic_name:test"}, + {Name: "topic_depth", A: "topic.depth:0|g|#topic_name:test"}, + {Name: "backend_depth", A: "topic.backend_depth:0|g|#topic_name:test"}, + {Name: "channel.message_count", A: "channel.message_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.depth", A: "channel.depth:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.backend_depth", A: "channel.backend_depth:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.in_flight_count", A: "channel.in_flight_count:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.deferred_count", A: "channel.deferred_count:0|g|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.requeue_count", A: "channel.requeue_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.timeout_count", A: "channel.timeout_count:0|c|#topic_name:test,channel_name:test_channel"}, + {Name: "channel.clients", A: "channel.clients:0|g|#topic_name:test,channel_name:test_channel"}, + } + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + buffer := make([]byte, 128) + + _, _, err := conn.ReadFromUDP(buffer) + if err != nil { + panic(err) + } + fmt.Println(string(buffer)) + test.Equal(t, string(bytes.Trim(buffer, "\x00")), tc.A) + + }) + } +} From 9b72e380261f65fb5dc0dd740a55499027140ee9 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Sat, 29 Jul 2017 22:49:26 -0400 Subject: [PATCH 19/22] dogstats contrib: removed subtests to support golang 1.6 --- contrib/dogstatsd_test.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/contrib/dogstatsd_test.go b/contrib/dogstatsd_test.go index 5bc5a8202..5d3b1832c 100644 --- a/contrib/dogstatsd_test.go +++ b/contrib/dogstatsd_test.go @@ -116,16 +116,13 @@ func TestLoopSendsCorrectMessages(t *testing.T) { {Name: "channel.clients", A: "channel.clients:0|g|#topic_name:test,channel_name:test_channel"}, } for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - buffer := make([]byte, 128) + buffer := make([]byte, 128) - _, _, err := conn.ReadFromUDP(buffer) - if err != nil { - panic(err) - } - fmt.Println(string(buffer)) - test.Equal(t, string(bytes.Trim(buffer, "\x00")), tc.A) - - }) + _, _, err := conn.ReadFromUDP(buffer) + if err != nil { + panic(err) + } + fmt.Println(string(buffer)) + test.Equal(t, string(bytes.Trim(buffer, "\x00")), tc.A) } } From 6f9b481fe19f0d9607b5ecbccb4b853596a8066a Mon Sep 17 00:00:00 2001 From: daniel mican Date: Sun, 30 Jul 2017 21:01:40 -0400 Subject: [PATCH 20/22] dogstatsd contrib: initializer table --- contrib/nsqd.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/contrib/nsqd.go b/contrib/nsqd.go index 702def216..f969bb0c2 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -3,6 +3,7 @@ package contrib import ( "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/nsqd" + "strings" ) type INSQD interface { @@ -11,6 +12,8 @@ type INSQD interface { AddModuleGoroutine(addonFn func(exitChan chan int)) } +type initializer func([]string, INSQD) INSQDAddon + type INSQDAddon interface { Enabled() bool Start() @@ -31,13 +34,31 @@ func (as *NSQDAddons) Start() { // Initializes addons that have options set func NewEnabledNSQDAddons(contribOpts []string, n INSQD) *NSQDAddons { var activeAddons []INSQDAddon + var hasOpt bool + + initializers := map[string]initializer{ + "dogstatsd": NewNSQDDogStatsd, + } n.Logf(nsqd.LOG_INFO, "Addons Initializing") - // ask each addon if it should be initialize - dogStats := NewNSQDDogStatsd(contribOpts, n) - if dogStats.Enabled() { - activeAddons = append(activeAddons, dogStats) + for k, initializer := range initializers { + // check if any of the options contains this addon's expected argument + hasOpt = false + + for _, opt := range contribOpts { + if strings.Contains(opt, k) { + hasOpt = true + break + } + } + + if hasOpt { + addon := initializer(contribOpts, n) + if addon.Enabled() { + activeAddons = append(activeAddons, addon) + } + } } return &NSQDAddons{ From 059888b6100f5762ff7a8a29c05c21df31db0240 Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 14 Aug 2017 08:59:08 -0400 Subject: [PATCH 21/22] dogstatsd_contrib: updated to only initialize contrib addons with filtered (valid) opts --- contrib/nsqd.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/contrib/nsqd.go b/contrib/nsqd.go index f969bb0c2..c2e60d9fa 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -31,33 +31,34 @@ func (as *NSQDAddons) Start() { } } +func optHasPrefix(opt string, prefix string) bool { + return strings.Index(opt, prefix) == 0 +} + // Initializes addons that have options set func NewEnabledNSQDAddons(contribOpts []string, n INSQD) *NSQDAddons { var activeAddons []INSQDAddon - var hasOpt bool initializers := map[string]initializer{ - "dogstatsd": NewNSQDDogStatsd, + "-dogstatsd": NewNSQDDogStatsd, } n.Logf(nsqd.LOG_INFO, "Addons Initializing") - for k, initializer := range initializers { + for prefix, initializer := range initializers { + validOpts := []string{} // check if any of the options contains this addon's expected argument - hasOpt = false - + // keeps track of all options starting with the correct prefix + // and initializes with the valid options for _, opt := range contribOpts { - if strings.Contains(opt, k) { - hasOpt = true - break + if optHasPrefix(opt, prefix) { + validOpts = append(validOpts, opt) } } - if hasOpt { - addon := initializer(contribOpts, n) - if addon.Enabled() { - activeAddons = append(activeAddons, addon) - } + addon := initializer(contribOpts, n) + if addon.Enabled() { + activeAddons = append(activeAddons, addon) } } From 5e2012ae268d7f3b00a9f44133b5b35ed044f71a Mon Sep 17 00:00:00 2001 From: daniel mican Date: Mon, 14 Aug 2017 09:10:29 -0400 Subject: [PATCH 22/22] dogstatsd_contrib: more exact match on filtering add on options --- contrib/nsqd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/nsqd.go b/contrib/nsqd.go index c2e60d9fa..b69c0760a 100644 --- a/contrib/nsqd.go +++ b/contrib/nsqd.go @@ -40,7 +40,7 @@ func NewEnabledNSQDAddons(contribOpts []string, n INSQD) *NSQDAddons { var activeAddons []INSQDAddon initializers := map[string]initializer{ - "-dogstatsd": NewNSQDDogStatsd, + "-dogstatsd-": NewNSQDDogStatsd, } n.Logf(nsqd.LOG_INFO, "Addons Initializing")