Skip to content

Commit

Permalink
Added reconnection logic when NATS is disconnected
Browse files Browse the repository at this point in the history
How was it tested:
I deployed this version of nats-queue-worker and then simulated
NATS disconnection

Signed-off-by: Bart Smykla <[email protected]>
  • Loading branch information
Bart Smykla authored and alexellis committed Jan 29, 2019
1 parent afb259c commit c41f9b1
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 11 deletions.
70 changes: 59 additions & 11 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,65 @@ import (
"encoding/json"
"fmt"
"log"
"sync"
"time"

"github.com/nats-io/go-nats-streaming"
"github.com/openfaas/faas/gateway/queue"
)

// NatsQueue queue for work
type NatsQueue struct {
nc stan.Conn
nc stan.Conn
ncMutex *sync.RWMutex
maxReconnect int
reconnectDelay time.Duration

ClientID string
ClusterID string
NATSURL string
Topic string
}

func (q *NatsQueue) connect() error {
nc, err := stan.Connect(
q.ClusterID,
q.ClientID,
stan.NatsURL(q.NATSURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.NATSURL)

q.reconnect()
}),
)

if err != nil {
return err
}

q.ncMutex.Lock()
q.nc = nc
q.ncMutex.Unlock()

return nil
}

func (q *NatsQueue) reconnect() {
for i := 0; i < q.maxReconnect; i++ {
time.Sleep(time.Second * time.Duration(i) * q.reconnectDelay)

if err := q.connect(); err == nil {
log.Printf("Reconnection (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.NATSURL)

return
}

log.Printf("Reconnection (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.NATSURL)
}

log.Printf("Reconnection limit (%d) reached\n", q.maxReconnect)
}

// CreateNatsQueue ready for asynchronous processing
func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQueue, error) {
var err error
Expand All @@ -27,30 +72,33 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu
clientID := clientConfig.GetClientID()
clusterID := "faas-cluster"

nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
queue1 := NatsQueue{
nc: nc,
ClientID: clientID,
ClusterID: clusterID,
NATSURL: natsURL,
Topic: "faas-request",
ClientID: clientID,
ClusterID: clusterID,
NATSURL: natsURL,
Topic: "faas-request",
maxReconnect: clientConfig.GetMaxReconnect(),
reconnectDelay: clientConfig.GetReconnectDelay(),
ncMutex: &sync.RWMutex{},
}

err = queue1.connect()

return &queue1, err
}

// Queue request for processing
func (q *NatsQueue) Queue(req *queue.Request) error {
var err error

fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function)

out, err := json.Marshal(req)
if err != nil {
log.Println(err)
}

err = q.nc.Publish(q.Topic, out)
q.ncMutex.RLock()
nc := q.nc
q.ncMutex.RUnlock()

return err
return nc.Publish(q.Topic, out)
}
17 changes: 17 additions & 0 deletions handler/nats_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ package handler

import (
"os"
"time"

"github.com/openfaas/nats-queue-worker/nats"
)

type NatsConfig interface {
GetClientID() string
GetMaxReconnect() int
GetReconnectDelay() time.Duration
}

type DefaultNatsConfig struct {
maxReconnect int
reconnectDelay time.Duration
}

func NewDefaultNatsConfig(maxReconnect int, reconnectDelay time.Duration) DefaultNatsConfig {
return DefaultNatsConfig{maxReconnect, reconnectDelay}
}

// GetClientID returns the ClientID assigned to this producer/consumer.
Expand All @@ -19,6 +28,14 @@ func (DefaultNatsConfig) GetClientID() string {
return getClientID(val)
}

func (c DefaultNatsConfig) GetMaxReconnect() int {
return c.maxReconnect
}

func (c DefaultNatsConfig) GetReconnectDelay() time.Duration {
return c.reconnectDelay
}

func getClientID(hostname string) string {
return "faas-publisher-" + nats.GetClientID(hostname)
}

0 comments on commit c41f9b1

Please sign in to comment.