Skip to content

Commit

Permalink
feat: adding AMQP heartbeat/retry/retrydelay support (#549)
Browse files Browse the repository at this point in the history
* feat: adding AMQP heartbeat/retry/retrydelay support

* fix: removing AMQP heartbeat/retry/retrydelay redudant code
  • Loading branch information
kiraum authored Jun 14, 2024
1 parent bb3e8ec commit b30db90
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
25 changes: 14 additions & 11 deletions cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,20 @@ type Rewriter struct {
}

type Amqp struct {
Amqp_enabled bool
Amqp_host string
Amqp_port int
Amqp_vhost string
Amqp_user string
Amqp_password string
Amqp_exchange string
Amqp_queue string
Amqp_key string
Amqp_durable bool
Amqp_exclusive bool
Amqp_enabled bool
Amqp_host string
Amqp_port int
Amqp_vhost string
Amqp_user string
Amqp_password string
Amqp_exchange string
Amqp_queue string
Amqp_key string
Amqp_durable bool
Amqp_exclusive bool
Amqp_heartbeat int
Amqp_retry bool
Amqp_retrydelay int
}

type Init struct {
Expand Down
3 changes: 3 additions & 0 deletions examples/carbon-relay-ng.ini
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ amqp_queue = ""
amqp_key = "#"
amqp_durable = false
amqp_exclusive = true
amqp_heartbeat = 70
amqp_retry = true
amqp_retrydelay = 30

# Aggregators
# See https://github.com/grafana/carbon-relay-ng/blob/master/docs/config.md#Aggregators
Expand Down
49 changes: 46 additions & 3 deletions input/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,55 @@ type amqpConnector func(a *Amqp) error
// AMQPConnector connects using the given configuration
func AMQPConnector(a *Amqp) error {
log.Infof("dialing AMQP: %v", a.uri)
conn, err := amqp.Dial(a.uri.String())
if err != nil {
return err

var conn *amqp.Connection
var err error

for {
config := amqp.Config{
Heartbeat: time.Duration(a.config.Amqp.Amqp_heartbeat) * time.Second,
}

conn, err = amqp.DialConfig(a.uri.String(), config)
if err == nil {
log.Printf("Successfully connected to AMQP server: %v.", a.uri)
break
}

if !a.config.Amqp.Amqp_retry {
return err
}

log.Errorf("Failed to connect to AMQP server: %v. Retrying in %d seconds...", err, a.config.Amqp.Amqp_retrydelay)
time.Sleep(time.Duration(a.config.Amqp.Amqp_retrydelay) * time.Second)
}

a.conn = conn

// Create a channel to receive close notifications from the connection
closeCh := make(chan *amqp.Error)
conn.NotifyClose(closeCh)

// Start a goroutine to monitor the connection state
go func() {
for {
select {
case <-closeCh:
log.Println("AMQP connection closed.")

if !a.config.Amqp.Amqp_retry {
log.Println("Retry is disabled. Exiting reconnection attempt.")
return
}

log.Println("Attempting to reconnect...")

AMQPConnector(a)
return
}
}
}()

amqpChan, err := conn.Channel()
if err != nil {
a.conn.Close()
Expand Down

0 comments on commit b30db90

Please sign in to comment.