Skip to content

Commit

Permalink
remove unnecessary connect func literal
Browse files Browse the repository at this point in the history
  • Loading branch information
woodsaj authored and Dieterbe committed Feb 16, 2017
1 parent a533123 commit 7851e7a
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,20 @@ func (r *KafkaMdm) run() {
ticker := time.NewTicker(r.flushMaxWait)
brokers := []string{r.broker}

connect := func() error {
var err error
for r.producer == nil {
r.producer, err = sarama.NewSyncProducer(brokers, r.saramaCfg)
if err == sarama.ErrOutOfBrokers {
log.Warning("kafkaMdm %q: %s", r.key, err)
// sleep before trying to connect again.
time.Sleep(time.Second)
} else if err != nil {
log.Fatal(4, "kafkaMdm %q: failed to initialize kafka producer. %s", r.key, err)
} else {
log.Notice("kafkaMdm %q: now connected to kafka", r.key)
}
return err
}
log.Notice("kafkaMdm %q: now connected to kafka", r.key)

// flushes the data to kafka and resets buffer. blocks until it succeeds
flush := func() {
for r.producer == nil && connect() != nil {
time.Sleep(time.Second)
}
for {
pre := time.Now()
var err error
Expand Down Expand Up @@ -194,8 +190,6 @@ func (r *KafkaMdm) run() {
case <-ticker.C:
if len(metrics) != 0 {
flush()
} else if r.producer == nil {
connect()
}
}
}
Expand Down

0 comments on commit 7851e7a

Please sign in to comment.