Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sgirones committed Oct 4, 2017
1 parent 5ddd599 commit 31f5251
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 31 deletions.
16 changes: 3 additions & 13 deletions discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,10 @@ type Endpoint struct {
Port uint16
}

func DiscoverEndpoints(serviceDNS string) ([]Endpoint, error) {
endpoints := []Endpoint{}
func DiscoverEndpoints(serviceDNS string) ([]*net.SRV, error) {
_, remotes, err := net.LookupSRV("", "", serviceDNS)

if err != nil {
return endpoints, err
}

for _, n := range remotes {
endpoints = append(endpoints, Endpoint{
Name: n.Target,
Port: n.Port,
})
return nil, err
}

return endpoints, nil
return remotes, nil
}
20 changes: 9 additions & 11 deletions messaging/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type NatsConfig struct {
TLS *tls.Config `mapstructure:"tls_conf"`
DiscoveryName string `mapstructure:"discovery_name"`
DiscoveryName string `split_words:"true" mapstructure:"discovery_name"`
Servers []string `mapstructure:"servers"`
LogsSubject string `mapstructure:"log_subject"`
}
Expand Down Expand Up @@ -66,14 +66,12 @@ func ConfigureNatsConnection(config *NatsConfig, log *logrus.Entry) (*nats.Conn,

// ConnectToNats will do a TLS connection to the nats servers specified
func ConnectToNats(config *NatsConfig, errHandler nats.ErrHandler) (*nats.Conn, error) {
var err error
serversString := config.ServerString()

if config.DiscoveryName != "" {
serversString, err = buildNatsServersString(config.DiscoveryName)
servers, err := discoverNatsURLs(config.DiscoveryName)
if err != nil {
return nil, err
}
config.Servers = servers
}

options := []nats.Option{}
Expand All @@ -91,7 +89,7 @@ func ConnectToNats(config *NatsConfig, errHandler nats.ErrHandler) (*nats.Conn,
options = append(options, nats.ErrorHandler(errHandler))
}

return nats.Connect(serversString, options...)
return nats.Connect(config.ServerString(), options...)
}

func ErrorHandler(log *logrus.Entry) nats.ErrHandler {
Expand All @@ -105,17 +103,17 @@ func ErrorHandler(log *logrus.Entry) nats.ErrHandler {
}
}

func buildNatsServersString(serviceName string) (string, error) {
natsUrls := []string{}
func discoverNatsURLs(serviceName string) ([]string, error) {
natsURLs := []string{}

endpoints, err := discovery.DiscoverEndpoints(serviceName)
if err != nil {
return "", err
return nil, err
}

for _, endpoint := range endpoints {
natsUrls = append(natsUrls, fmt.Sprintf("nats://%s:%d", endpoint.Name, endpoint.Port))
natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port))
}

return strings.Join(natsUrls, ","), nil
return natsURLs, nil
}
13 changes: 6 additions & 7 deletions messaging/rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Consumer) Clone(queueName string, delivery *DeliveryDefinition) (*Consu

type RabbitConfig struct {
Servers []string `mapstructure:"servers"`
DiscoveryName string `mapstructure:"discovery_name"`
DiscoveryName string `split_words:"true" mapstructure:"discovery_name"`
TLS *tls.Config `mapstructure:"tls_conf"`

ExchangeDefinition ExchangeDefinition `envconfig:"exchange" mapstructure:"exchange"`
Expand Down Expand Up @@ -206,20 +206,19 @@ func ValidateRabbitConfigStruct(servers []string, exchange ExchangeDefinition, q

// ConnectToRabbit will open a TLS connection to rabbit mq
func ConnectToRabbit(config *RabbitConfig, log *logrus.Entry) (*Consumer, error) {
var err error
if err = ValidateRabbitConfig(config); err != nil {
if err := ValidateRabbitConfig(config); err != nil {
return nil, err
}

servers := config.Servers
if config.DiscoveryName != "" {
servers, err = discoverRabbitServers(config.DiscoveryName)
servers, err := discoverRabbitServers(config.DiscoveryName)
if err != nil {
return nil, err
}
config.Servers = servers
}

conn, err := DialToRabbit(servers, config.TLS, log)
conn, err := DialToRabbit(config.Servers, config.TLS, log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,7 +413,7 @@ func discoverRabbitServers(serviceName string) ([]string, error) {
}

for _, endpoint := range endpoints {
rabbitUrls = append(rabbitUrls, fmt.Sprintf("%s:%d", endpoint.Name, endpoint.Port))
rabbitUrls = append(rabbitUrls, fmt.Sprintf("%s:%d", endpoint.Target, endpoint.Port))
}

return rabbitUrls, nil
Expand Down

0 comments on commit 31f5251

Please sign in to comment.