Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib: skeleton app structure & Dogstatsd nsqd addon #909

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
"github.com/nsqio/nsq/contrib"
)

type tlsRequiredOption int
Expand Down Expand Up @@ -200,6 +201,13 @@ func (p *program) Start() error {
opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)

// add the contrib options to these
contribOpts := contrib.NewContribOptions()
// add the flags for each of contrib
contrib.AddNSQDContribFlags(contribOpts, flagSet)

// add the contrib flags to these
flagSet.Parse(os.Args[1:])

rand.Seed(time.Now().UTC().UnixNano())
Expand All @@ -220,6 +228,8 @@ func (p *program) Start() error {
cfg.Validate()

options.Resolve(opts, flagSet, cfg)
options.Resolve(contribOpts, flagSet, cfg)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

best not to add unrelated whitespace changes here

nsqd := nsqd.New(opts)

err := nsqd.LoadMetadata()
Expand All @@ -232,6 +242,10 @@ func (p *program) Start() error {
}
nsqd.Main()

// hook into addons
addons := contrib.NewNSQDAddons(contribOpts, nsqd)
addons.Start()

p.nsqd = nsqd
return nil
}
Expand Down
78 changes: 78 additions & 0 deletions contrib/datadog_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package contrib

import (
"errors"
"fmt"
"net"
"strings"
"time"
)

type DataDogClient struct {
conn net.Conn
addr string
prefix string
}

type DataDogTags struct {
tags map[string]string
}

// returns dogstatd compatible string
// "#tag1:value1,tag2:value2
func (ddt *DataDogTags) String() string {
ts := []string{}
for k, v := range ddt.tags {
ts = append(ts, fmt.Sprintf("%s:%s", k, v))
}
return "#" + strings.Join(ts, ",")
}

func NewDataDogClient(addr string, prefix string) *DataDogClient {
return &DataDogClient{
addr: addr,
prefix: prefix,
}
}

func (c *DataDogClient) String() string {
return c.addr
}

func (c *DataDogClient) CreateSocket() error {
conn, err := net.DialTimeout("udp", c.addr, time.Second)
if err != nil {
return err
}
c.conn = conn
return nil
}

func (c *DataDogClient) Close() error {
return c.conn.Close()
}

func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", count, tags)
}

func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", -count, tags)
}

func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error {
return c.send(stat, "%d|ms", delta, tags)
}

func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error {
return c.send(stat, "%d|g", value, tags)
}

func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error {
if c.conn == nil {
return errors.New("not connected")
}
format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String())
_, err := fmt.Fprintf(c.conn, format, value)
return err
}
1 change: 1 addition & 0 deletions contrib/datadog_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package contrib
195 changes: 195 additions & 0 deletions contrib/dogstatsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package contrib

import (
"fmt"
"github.com/nsqio/nsq/nsqd"
"time"
)

type NSQDDogStatsdOptions struct {
DogStatsdAddress string `flag:"dogstatsd-address"`
DogStatsdPrefix string `flag:"dogstatsd-prefix"`
DogStatsdInterval time.Duration `flag:"dogstatsd-interval"`
}

func NewNSQDDogStatsdDefaultOptions() *NSQDDogStatsdOptions {
return &NSQDDogStatsdOptions{
DogStatsdPrefix: "nsq.",
DogStatsdInterval: 10 * time.Second,
}
}

type NSQDDogStatsd struct {
nsqd *nsqd.NSQD
contribOpts *NSQDContribOptions
}

func (dd *NSQDDogStatsd) Active() bool {
if dd.contribOpts.DogStatsdAddress != "" {
return true
} else {
return false
}
}

func (dd *NSQDDogStatsd) Start() {
logger.Println("Starting nsqd datadog")

dd.nsqd.RegisterAddon(dd.Loop)
}

func (dd *NSQDDogStatsd) Loop() {
// var lastMemStats *nsqd.memStats
var lastStats []nsqd.TopicStats
var stat string

ticker := time.NewTicker(dd.contribOpts.DogStatsdInterval)

dd.nsqd.Logf(nsqd.LOG_DEBUG, "Loop started")
exitChan := *dd.nsqd.ExitChan()

for {
select {
case <-exitChan:
goto exit
case <-ticker.C:
dd.nsqd.Logf(nsqd.LOG_DEBUG, "LOOPING")

client := NewDataDogClient(
dd.contribOpts.DogStatsdAddress,
dd.contribOpts.DogStatsdPrefix,
)
err := client.CreateSocket()
if err != nil {
dd.nsqd.Logf(nsqd.LOG_ERROR, "failed to create UDP socket to dogstatsd(%s)", client)
continue
}

dd.nsqd.Logf(nsqd.LOG_INFO, "DOGSTATSD: pushing stats to %s", client)

stats := dd.nsqd.GetStats()
for _, topic := range stats {
// try to find the topic in the last collection
lastTopic := nsqd.TopicStats{}
for _, checkTopic := range lastStats {
if topic.TopicName == checkTopic.TopicName {
lastTopic = checkTopic
break
}
}
diff := topic.MessageCount - lastTopic.MessageCount

client.Incr("message_count", int64(diff), &DataDogTags{
tags: map[string]string{"topic_name": topic.TopicName},
})

client.Gauge("topic.depth", topic.Depth, &DataDogTags{
tags: map[string]string{"topic_name": topic.TopicName},
})

client.Gauge("topic.backend_depth", topic.BackendDepth, &DataDogTags{
tags: map[string]string{"topic_name": topic.TopicName},
})

for _, item := range topic.E2eProcessingLatency.Percentiles {
stat = fmt.Sprintf("topic.e2e_processing_latency_%.0f", item["quantile"]*100.0)
// We can cast the value to int64 since a value of 1 is the
// minimum resolution we will have, so there is no loss of
// accuracy
client.Gauge(stat, int64(item["value"]), &DataDogTags{
tags: map[string]string{"topic_name": topic.TopicName},
})
}

for _, channel := range topic.Channels {
// try to find the channel in the last collection
lastChannel := nsqd.ChannelStats{}
for _, checkChannel := range lastTopic.Channels {
if channel.ChannelName == checkChannel.ChannelName {
lastChannel = checkChannel
break
}
}
diff := channel.MessageCount - lastChannel.MessageCount
client.Incr("channel.message_count", int64(diff), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

client.Gauge("channel.depth", channel.Depth, &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

client.Gauge("channel.backend_depth", channel.BackendDepth, &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

// stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName)
client.Gauge("channel.in_flight_count", int64(channel.InFlightCount), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

// stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName)
client.Gauge("channel.deferred_count", int64(channel.DeferredCount), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

diff = channel.RequeueCount - lastChannel.RequeueCount
// stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName)
client.Incr("channel.requeue_count", int64(diff), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

diff = channel.TimeoutCount - lastChannel.TimeoutCount
// stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName)
client.Incr("channel.timeout_count", int64(diff), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

// stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName)
client.Gauge("channel.clients", int64(len(channel.Clients)), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})

for _, item := range channel.E2eProcessingLatency.Percentiles {
stat = fmt.Sprintf("channel.e2e_processing_latency_%.0f", item["quantile"]*100.0)
client.Gauge(stat, int64(item["value"]), &DataDogTags{
tags: map[string]string{
"topic_name": topic.TopicName,
"channel_name": channel.ChannelName,
},
})
}
}
}
lastStats = stats
client.Close()
}
}

exit:
ticker.Stop()
}
60 changes: 60 additions & 0 deletions contrib/nsqd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package contrib

import (
"flag"
"github.com/nsqio/nsq/nsqd"
"log"
"os"
"time"
)

var logger = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds)

type INSQDAddon interface {
Active() bool
Start()
}

type NSQDContribOptions struct {
*NSQDDogStatsdOptions
}

// Instantiates all contrib default options
func NewContribOptions() *NSQDContribOptions {
return &NSQDContribOptions{
NewNSQDDogStatsdDefaultOptions(),
}
}

func AddNSQDContribFlags(opts *NSQDContribOptions, flagSet *flag.FlagSet) {
flagSet.String("dogstatsd-address", opts.DogStatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
flagSet.Duration("dogstatsd-interval", opts.DogStatsdInterval, "duration between pushing to dogstatsd")
// flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
flagSet.String("dogstatsd-prefix", opts.DogStatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
}

type NSQDAddons struct {
addons []INSQDAddon
}

// Starts all addons that are active
func (as *NSQDAddons) Start() {
logger.Println("Starting All addons")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should somehow go through NSQD.logf() too.


for _, addon := range as.addons {
if addon.Active() {
addon.Start()
}
}
}

func NewNSQDAddons(contribOpts *NSQDContribOptions, nsqd *nsqd.NSQD) *NSQDAddons {
return &NSQDAddons{
addons: []INSQDAddon{
&NSQDDogStatsd{
contribOpts: contribOpts,
nsqd: nsqd,
},
},
}
}
6 changes: 6 additions & 0 deletions nsqd/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ func (n *NSQD) logf(level lg.LogLevel, f string, args ...interface{}) {
opts := n.getOpts()
lg.Logf(opts.Logger, opts.logLevel, level, f, args...)
}

// would like to expose logf to the contrib modules so that contrib can share the
// configuration end user specifies on CLI
func (n *NSQD) Logf(level lg.LogLevel, f string, args ...interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possibly cleaner alternative is to pass the logf bound-method reference to NewNSQDAddons() or NSQDAddons.Start(), which could take a internal.lg.AppLogFunc type. For example see

protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
and
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {

n.logf(level, f, args...)
}
Loading