Skip to content

Commit

Permalink
Added reconnection logic when NATS is disconnected
Browse files Browse the repository at this point in the history
How was it tested:
I deployed this version of nats-queue-worker and then simulated
NATS disconnection

Signed-off-by: Bart Smykla <[email protected]>
  • Loading branch information
Bart Smykla committed Jan 14, 2019
1 parent 4d38388 commit c747603
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,67 @@ import (
"encoding/json"
"fmt"
"log"
"sync"
"time"

"github.com/nats-io/go-nats-streaming"
"github.com/openfaas/faas/gateway/queue"
)

var (
maxReconnect = 5
delayBetweenReconnect = 2
)

// NatsQueue queue for work
type NatsQueue struct {
nc stan.Conn
ncMutex *sync.RWMutex
ClientID string
ClusterID string
NATSURL string
Topic string
}

func (q *NatsQueue) connect() error {
nc, err := stan.Connect(
q.ClusterID,
q.ClientID,
stan.NatsURL(q.NATSURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.NATSURL)

q.reconnect(0)
}),
)

if err != nil {
return err
}

q.ncMutex.Lock()
q.nc = nc
q.ncMutex.Unlock()

return nil
}

func (q *NatsQueue) reconnect(iteration int) {
log.Printf("Trying to reconnect (%d) to %s\n", iteration, q.NATSURL)

if iteration < maxReconnect {
time.Sleep(time.Second * time.Duration(iteration*delayBetweenReconnect))

if err := q.connect(); err != nil {
log.Printf("Reconnection (%d) to %s failed", iteration, q.NATSURL)

q.reconnect(iteration + 1)
} else {
log.Printf("Reconnection (%d) to %s succed", iteration, q.NATSURL)
}
}
}

// CreateNatsQueue ready for asynchronous processing
func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQueue, error) {
var err error
Expand All @@ -27,15 +74,16 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu
clientID := clientConfig.GetClientID()
clusterID := "faas-cluster"

nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
queue1 := NatsQueue{
nc: nc,
ClientID: clientID,
ClusterID: clusterID,
NATSURL: natsURL,
Topic: "faas-request",
ncMutex: &sync.RWMutex{},
}

err = queue1.connect()

return &queue1, err
}

Expand All @@ -50,7 +98,11 @@ func (q *NatsQueue) Queue(req *queue.Request) error {
log.Println(err)
}

err = q.nc.Publish(q.Topic, out)
q.ncMutex.RLock()
nc := q.nc
q.ncMutex.RUnlock()

err = nc.Publish(q.Topic, out)

return err
}

0 comments on commit c747603

Please sign in to comment.