Skip to content

Commit

Permalink
Merge pull request #38 from netlify/salvador/add-dns-discovery-package
Browse files Browse the repository at this point in the history
[discovery] Add DNS discovery methods. Rabbit and Nats config to support DNS discovery
  • Loading branch information
calavera authored Oct 4, 2017
2 parents 0445a47 + 31f5251 commit 44fb358
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 19 deletions.
18 changes: 18 additions & 0 deletions discovery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package discovery

import (
"net"
)

type Endpoint struct {
Name string
Port uint16
}

func DiscoverEndpoints(serviceDNS string) ([]*net.SRV, error) {
_, remotes, err := net.LookupSRV("", "", serviceDNS)
if err != nil {
return nil, err
}
return remotes, nil
}
26 changes: 14 additions & 12 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import:
- package: github.com/kelseyhightower/envconfig
version: v1.3.0
- package: github.com/nats-io/nats
version: v1.2.2
version: v1.3.0
- package: github.com/pkg/errors
version: v0.8.0
- package: github.com/rybit/nats_logrus_hook
Expand Down
33 changes: 29 additions & 4 deletions messaging/nats.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package messaging

import (
"fmt"
"strings"

"github.com/nats-io/nats"
"github.com/sirupsen/logrus"

"github.com/rybit/nats_logrus_hook"

"github.com/netlify/netlify-commons/discovery"
"github.com/netlify/netlify-commons/tls"
)

type NatsConfig struct {
TLS *tls.Config `mapstructure:"tls_conf"`
Servers []string `mapstructure:"servers"`
LogsSubject string `mapstructure:"log_subject"`
TLS *tls.Config `mapstructure:"tls_conf"`
DiscoveryName string `split_words:"true" mapstructure:"discovery_name"`
Servers []string `mapstructure:"servers"`
LogsSubject string `mapstructure:"log_subject"`
}

type MetricsConfig struct {
Expand All @@ -29,7 +32,6 @@ func (config *NatsConfig) ServerString() string {

func (config *NatsConfig) Fields() logrus.Fields {
f := logrus.Fields{

"logs_subject": config.LogsSubject,
"servers": strings.Join(config.Servers, ","),
}
Expand Down Expand Up @@ -64,6 +66,14 @@ func ConfigureNatsConnection(config *NatsConfig, log *logrus.Entry) (*nats.Conn,

// ConnectToNats will do a TLS connection to the nats servers specified
func ConnectToNats(config *NatsConfig, errHandler nats.ErrHandler) (*nats.Conn, error) {
if config.DiscoveryName != "" {
servers, err := discoverNatsURLs(config.DiscoveryName)
if err != nil {
return nil, err
}
config.Servers = servers
}

options := []nats.Option{}
if config.TLS != nil {
tlsConfig, err := config.TLS.TLSConfig()
Expand Down Expand Up @@ -92,3 +102,18 @@ func ErrorHandler(log *logrus.Entry) nats.ErrHandler {
}).Error("Error while consuming from " + sub.Subject)
}
}

func discoverNatsURLs(serviceName string) ([]string, error) {
natsURLs := []string{}

endpoints, err := discovery.DiscoverEndpoints(serviceName)
if err != nil {
return nil, err
}

for _, endpoint := range endpoints {
natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port))
}

return natsURLs, nil
}
29 changes: 27 additions & 2 deletions messaging/rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/netlify/netlify-commons/discovery"
"github.com/netlify/netlify-commons/tls"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
Expand Down Expand Up @@ -40,8 +41,9 @@ func (c *Consumer) Clone(queueName string, delivery *DeliveryDefinition) (*Consu
}

type RabbitConfig struct {
Servers []string `mapstructure:"servers"`
TLS *tls.Config `mapstructure:"tls_conf"`
Servers []string `mapstructure:"servers"`
DiscoveryName string `split_words:"true" mapstructure:"discovery_name"`
TLS *tls.Config `mapstructure:"tls_conf"`

ExchangeDefinition ExchangeDefinition `envconfig:"exchange" mapstructure:"exchange"`
QueueDefinition QueueDefinition `envconfig:"queue" mapstructure:"queue"`
Expand Down Expand Up @@ -208,6 +210,14 @@ func ConnectToRabbit(config *RabbitConfig, log *logrus.Entry) (*Consumer, error)
return nil, err
}

if config.DiscoveryName != "" {
servers, err := discoverRabbitServers(config.DiscoveryName)
if err != nil {
return nil, err
}
config.Servers = servers
}

conn, err := DialToRabbit(config.Servers, config.TLS, log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -394,6 +404,21 @@ func Consume(channel *amqp.Channel, deliveryDef *DeliveryDefinition) (<-chan amq
)
}

func discoverRabbitServers(serviceName string) ([]string, error) {
rabbitUrls := []string{}

endpoints, err := discovery.DiscoverEndpoints(serviceName)
if err != nil {
return rabbitUrls, err
}

for _, endpoint := range endpoints {
rabbitUrls = append(rabbitUrls, fmt.Sprintf("%s:%d", endpoint.Target, endpoint.Port))
}

return rabbitUrls, nil
}

// ----------------------------------------------------------------------------
// utils
// ----------------------------------------------------------------------------
Expand Down

0 comments on commit 44fb358

Please sign in to comment.