diff --git a/cfg/cfg.go b/cfg/cfg.go index 831822af..9203fcf7 100644 --- a/cfg/cfg.go +++ b/cfg/cfg.go @@ -140,17 +140,20 @@ type Rewriter struct { } type Amqp struct { - Amqp_enabled bool - Amqp_host string - Amqp_port int - Amqp_vhost string - Amqp_user string - Amqp_password string - Amqp_exchange string - Amqp_queue string - Amqp_key string - Amqp_durable bool - Amqp_exclusive bool + Amqp_enabled bool + Amqp_host string + Amqp_port int + Amqp_vhost string + Amqp_user string + Amqp_password string + Amqp_exchange string + Amqp_queue string + Amqp_key string + Amqp_durable bool + Amqp_exclusive bool + Amqp_heartbeat int + Amqp_retry bool + Amqp_retrydelay int } type Init struct { diff --git a/examples/carbon-relay-ng.ini b/examples/carbon-relay-ng.ini index b98915eb..edca0237 100644 --- a/examples/carbon-relay-ng.ini +++ b/examples/carbon-relay-ng.ini @@ -73,6 +73,9 @@ amqp_queue = "" amqp_key = "#" amqp_durable = false amqp_exclusive = true +amqp_heartbeat = 70 +amqp_retry = true +amqp_retrydelay = 30 # Aggregators # See https://github.com/grafana/carbon-relay-ng/blob/master/docs/config.md#Aggregators diff --git a/input/amqp.go b/input/amqp.go index 1d86aece..2a971517 100644 --- a/input/amqp.go +++ b/input/amqp.go @@ -144,12 +144,62 @@ type amqpConnector func(a *Amqp) error // AMQPConnector connects using the given configuration func AMQPConnector(a *Amqp) error { log.Infof("dialing AMQP: %v", a.uri) - conn, err := amqp.Dial(a.uri.String()) - if err != nil { - return err + + var conn *amqp.Connection + var err error + + for { + config := amqp.Config{ + Heartbeat: time.Duration(a.config.Amqp.Amqp_heartbeat) * time.Second, + } + + conn, err = amqp.DialConfig(a.uri.String(), config) + if err == nil { + break + } + + if !a.config.Amqp.Amqp_retry { + return err + } + + log.Errorf("Failed to connect to AMQP server: %v. Retrying in %d seconds...", err, a.config.Amqp.Amqp_retrydelay) + time.Sleep(time.Duration(a.config.Amqp.Amqp_retrydelay) * time.Second) } + a.conn = conn + // Create a channel to receive close notifications from the connection + closeCh := make(chan *amqp.Error) + conn.NotifyClose(closeCh) + + // Start a goroutine to monitor the connection state + go func() { + for { + select { + case <-closeCh: + log.Println("AMQP connection closed.") + + if !a.config.Amqp.Amqp_retry { + log.Println("Retry is disabled. Exiting reconnection attempt.") + return + } + + log.Println("Attempting to reconnect...") + + for { + err := AMQPConnector(a) + if err == nil { + log.Println("Successfully reconnected to AMQP server.") + return + } + + log.Errorf("Failed to reconnect to AMQP server: %v. Retrying in %d seconds...", err, a.config.Amqp.Amqp_retrydelay) + time.Sleep(time.Duration(a.config.Amqp.Amqp_retrydelay) * time.Second) + } + } + } + }() + amqpChan, err := conn.Channel() if err != nil { a.conn.Close()