diff --git a/route/kafkamdm.go b/route/kafkamdm.go index eeaa63bd..3689c4c7 100644 --- a/route/kafkamdm.go +++ b/route/kafkamdm.go @@ -111,24 +111,20 @@ func (r *KafkaMdm) run() { ticker := time.NewTicker(r.flushMaxWait) brokers := []string{r.broker} - connect := func() error { - var err error + for r.producer == nil { r.producer, err = sarama.NewSyncProducer(brokers, r.saramaCfg) if err == sarama.ErrOutOfBrokers { log.Warning("kafkaMdm %q: %s", r.key, err) + // sleep before trying to connect again. + time.Sleep(time.Second) } else if err != nil { log.Fatal(4, "kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err) - } else { - log.Notice("kafkaMdm %q: now connected to kafka", r.key) } - return err } + log.Notice("kafkaMdm %q: now connected to kafka", r.key) // flushes the data to kafka and resets buffer. blocks until it succeeds flush := func() { - for r.producer == nil && connect() != nil { - time.Sleep(time.Second) - } for { pre := time.Now() var err error @@ -194,8 +190,6 @@ func (r *KafkaMdm) run() { case <-ticker.C: if len(metrics) != 0 { flush() - } else if r.producer == nil { - connect() } } }