Skip to content

Commit

Permalink
Merge pull request #12 from Clarilab/add-functionality-to-renew-an-ex…
Browse files Browse the repository at this point in the history
…isting-connection

Add functionality to renew an existing connection
  • Loading branch information
nicoandrewss authored Jan 22, 2024
2 parents d6ca9ec + 31eefa3 commit 39d4b26
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
48 changes: 48 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,52 @@ func (c *Connection) Recover() error {
return nil
}

// Renew can be used to establish a new connection.
// If new URI is provided, it will be used to renew the connection instead of the current URI.
func (c *Connection) Renew(uri ...string) error {
const errMessage = "failed to renew: %w"

if len(uri) == 1 {
c.options.uriMU.Lock()
c.options.uri = uri[0]
c.options.uriMU.Unlock()
}

if err := c.closeForRenewal(); err != nil {
return fmt.Errorf(errMessage, err)
}

if err := c.recoverConnection(); err != nil {
return fmt.Errorf(errMessage, err)
}

if err := c.recoverChannel(); err != nil {
return fmt.Errorf(errMessage, err)
}

return nil
}

func (c *Connection) closeForRenewal() error {
const errMessage = "failed to close the connection to the broker gracefully on renewal: %w"

if c.amqpConnection != nil {
c.logger.logDebug("closing connection")

c.connectionCloseWG.Add(closeWGDelta)

if err := c.amqpConnection.Close(); err != nil {
return fmt.Errorf(errMessage, err)
}

c.connectionCloseWG.Wait()

c.logger.logDebug("gracefully closed connection to the broker")
}

return nil
}

// RemoveQueue removes the queue from the broker including all bindings then purges the messages based on
// broker configuration, returning the number of messages purged.
//
Expand Down Expand Up @@ -224,7 +270,9 @@ func (c *Connection) createConnection() error {
var err error

c.amqpConnMU.Lock()
c.options.uriMU.RLock()
c.amqpConnection, err = amqp.DialConfig(c.options.uri, amqp.Config(*c.options.Config))
c.options.uriMU.RUnlock()
c.amqpConnMU.Unlock()

if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package clarimq

import (
"encoding/json"
"fmt"
"net"
"net/url"
"strconv"
"sync"
"time"

amqp "github.com/rabbitmq/amqp091-go"
Expand Down Expand Up @@ -32,6 +37,7 @@ type (
Config *Config
codec *codec
uri string
uriMU *sync.RWMutex
PrefetchCount int
RecoveryInterval time.Duration
MaxRecoveryRetries int
Expand All @@ -53,9 +59,23 @@ type (
ReturnHandler func(Return)
)

// ToURI returns the URI representation of the ConnectionSettings.
// Includes url escaping for safe usage.
func (c *ConnectionSettings) ToURI() string {
return fmt.Sprintf("amqp://%s:%s@%s/",
url.QueryEscape(c.UserName),
url.QueryEscape(c.Password),
net.JoinHostPort(
url.QueryEscape(c.Host),
strconv.Itoa(c.Port),
),
)
}

func defaultConnectionOptions(uri string) *ConnectionOptions {
return &ConnectionOptions{
uri: uri,
uriMU: &sync.RWMutex{},
RecoveryInterval: defaultRecoveryInterval,
MaxRecoveryRetries: defaultMaxRecoveryRetries,
BackOffFactor: defaultBackOffFactor,
Expand Down
6 changes: 5 additions & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ func (c *Consumer) setupRetryPublisher() error {
var err error

if c.options.RetryOptions.RetryConn == nil {
c.conn.options.uriMU.RLock()
uri := c.conn.options.uri
c.conn.options.uriMU.RUnlock()

if c.options.RetryOptions.RetryConn, err = NewConnection(
c.conn.options.uri,
uri,
WithConnectionOptionConnectionName(fmt.Sprintf("%s_%s", c.options.ConsumerOptions.Name, keyRetry)),
); err != nil {
return fmt.Errorf(errMessage, err)
Expand Down

0 comments on commit 39d4b26

Please sign in to comment.