diff --git a/discovery/main.go b/discovery/main.go new file mode 100644 index 0000000..a5f9f0a --- /dev/null +++ b/discovery/main.go @@ -0,0 +1,18 @@ +package discovery + +import ( + "net" +) + +type Endpoint struct { + Name string + Port uint16 +} + +func DiscoverEndpoints(serviceDNS string) ([]*net.SRV, error) { + _, remotes, err := net.LookupSRV("", "", serviceDNS) + if err != nil { + return nil, err + } + return remotes, nil +} diff --git a/glide.lock b/glide.lock index b51c1d6..5e98fd7 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: eed205870b7bc915d4c42a3cccea1c68e63bde3494d1aea9384777c34bf886cc -updated: 2017-09-18T15:34:24.716314689-07:00 +hash: 339fd13f8046318b0a0df3f78ef85db9fc7536ed1e15fb5223109515f1ed2bf0 +updated: 2017-10-03T10:49:34.76139528+02:00 imports: - name: github.com/BurntSushi/toml version: b26d9c308763d68093482582cea63d69be07a0f0 @@ -14,10 +14,10 @@ imports: subpackages: - hcl/ast - hcl/parser - - hcl/token - - json/parser - hcl/scanner - hcl/strconv + - hcl/token + - json/parser - json/scanner - json/token - name: github.com/inconshreveable/mousetrap @@ -32,11 +32,13 @@ imports: version: 8d7837e64d3c1ee4e54a880c5a920ab4316fc90a - name: github.com/mitchellh/mapstructure version: d0303fe809921458f417bcf828397a65db30a7e4 -- name: github.com/nats-io/nats - version: 61923ed1eaf8398000991fbbee2ef11ab5a5be0d +- name: github.com/nats-io/go-nats + version: 4b47946e1082797665a3da33c92860d4553455fe subpackages: - encoders/builtin - util +- name: github.com/nats-io/nats + version: 29f9728a183bf3fa7e809e14edac00b33be72088 - name: github.com/nats-io/nuid version: 3cf34f9fca4e88afa9da8eabd75e3326c9941b44 - name: github.com/pelletier/go-toml @@ -52,13 +54,13 @@ imports: - name: github.com/signalfx/golib version: cb7680940d605b817db79790c241eed2a00fa6e6 subpackages: - - sfxclient - datapoint - errors - event + - eventcounter - log + - sfxclient - timekeeper - - eventcounter - name: github.com/sirupsen/logrus version: a3f95b5c423586578a4e099b11a46c2479628cac - name: github.com/spf13/afero @@ -82,7 +84,7 @@ imports: subpackages: - context - name: golang.org/x/sys - version: 062cd7e4e68206d8bab9b18396626e855c992658 + version: f7928cfef4d09d1b080aa2b6fd3ca9ba1567c733 subpackages: - unix - name: golang.org/x/text @@ -96,9 +98,9 @@ imports: version: 3f83fa5005286a7fe593b055f0d7771a7dce4655 subpackages: - bson + - internal/json - internal/sasl - internal/scram - - internal/json - name: gopkg.in/stack.v1 version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: gopkg.in/yaml.v2 @@ -111,12 +113,12 @@ testImports: - name: github.com/nats-io/gnatsd version: 5dcad241bcd7fff3aac6e9f8b47350f071f5fd38 subpackages: - - test - auth - - server - conf - logger + - server - server/pse + - test - util - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d diff --git a/glide.yaml b/glide.yaml index 174067a..51efef1 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/kelseyhightower/envconfig version: v1.3.0 - package: github.com/nats-io/nats - version: v1.2.2 + version: v1.3.0 - package: github.com/pkg/errors version: v0.8.0 - package: github.com/rybit/nats_logrus_hook diff --git a/messaging/nats.go b/messaging/nats.go index 0338ec4..5a8791c 100644 --- a/messaging/nats.go +++ b/messaging/nats.go @@ -1,6 +1,7 @@ package messaging import ( + "fmt" "strings" "github.com/nats-io/nats" @@ -8,13 +9,15 @@ import ( "github.com/rybit/nats_logrus_hook" + "github.com/netlify/netlify-commons/discovery" "github.com/netlify/netlify-commons/tls" ) type NatsConfig struct { - TLS *tls.Config `mapstructure:"tls_conf"` - Servers []string `mapstructure:"servers"` - LogsSubject string `mapstructure:"log_subject"` + TLS *tls.Config `mapstructure:"tls_conf"` + DiscoveryName string `split_words:"true" mapstructure:"discovery_name"` + Servers []string `mapstructure:"servers"` + LogsSubject string `mapstructure:"log_subject"` } type MetricsConfig struct { @@ -29,7 +32,6 @@ func (config *NatsConfig) ServerString() string { func (config *NatsConfig) Fields() logrus.Fields { f := logrus.Fields{ - "logs_subject": config.LogsSubject, "servers": strings.Join(config.Servers, ","), } @@ -64,6 +66,14 @@ func ConfigureNatsConnection(config *NatsConfig, log *logrus.Entry) (*nats.Conn, // ConnectToNats will do a TLS connection to the nats servers specified func ConnectToNats(config *NatsConfig, errHandler nats.ErrHandler) (*nats.Conn, error) { + if config.DiscoveryName != "" { + servers, err := discoverNatsURLs(config.DiscoveryName) + if err != nil { + return nil, err + } + config.Servers = servers + } + options := []nats.Option{} if config.TLS != nil { tlsConfig, err := config.TLS.TLSConfig() @@ -92,3 +102,18 @@ func ErrorHandler(log *logrus.Entry) nats.ErrHandler { }).Error("Error while consuming from " + sub.Subject) } } + +func discoverNatsURLs(serviceName string) ([]string, error) { + natsURLs := []string{} + + endpoints, err := discovery.DiscoverEndpoints(serviceName) + if err != nil { + return nil, err + } + + for _, endpoint := range endpoints { + natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port)) + } + + return natsURLs, nil +} diff --git a/messaging/rabbit.go b/messaging/rabbit.go index 2cd752f..ec75a87 100644 --- a/messaging/rabbit.go +++ b/messaging/rabbit.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/netlify/netlify-commons/discovery" "github.com/netlify/netlify-commons/tls" "github.com/sirupsen/logrus" "github.com/streadway/amqp" @@ -40,8 +41,9 @@ func (c *Consumer) Clone(queueName string, delivery *DeliveryDefinition) (*Consu } type RabbitConfig struct { - Servers []string `mapstructure:"servers"` - TLS *tls.Config `mapstructure:"tls_conf"` + Servers []string `mapstructure:"servers"` + DiscoveryName string `split_words:"true" mapstructure:"discovery_name"` + TLS *tls.Config `mapstructure:"tls_conf"` ExchangeDefinition ExchangeDefinition `envconfig:"exchange" mapstructure:"exchange"` QueueDefinition QueueDefinition `envconfig:"queue" mapstructure:"queue"` @@ -208,6 +210,14 @@ func ConnectToRabbit(config *RabbitConfig, log *logrus.Entry) (*Consumer, error) return nil, err } + if config.DiscoveryName != "" { + servers, err := discoverRabbitServers(config.DiscoveryName) + if err != nil { + return nil, err + } + config.Servers = servers + } + conn, err := DialToRabbit(config.Servers, config.TLS, log) if err != nil { return nil, err @@ -394,6 +404,21 @@ func Consume(channel *amqp.Channel, deliveryDef *DeliveryDefinition) (<-chan amq ) } +func discoverRabbitServers(serviceName string) ([]string, error) { + rabbitUrls := []string{} + + endpoints, err := discovery.DiscoverEndpoints(serviceName) + if err != nil { + return rabbitUrls, err + } + + for _, endpoint := range endpoints { + rabbitUrls = append(rabbitUrls, fmt.Sprintf("%s:%d", endpoint.Target, endpoint.Port)) + } + + return rabbitUrls, nil +} + // ---------------------------------------------------------------------------- // utils // ----------------------------------------------------------------------------