Skip to content

Commit

Permalink
fix: only sleep when connect() errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Feb 16, 2017
1 parent b66e994 commit a533123
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions route/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *KafkaMdm) run() {
ticker := time.NewTicker(r.flushMaxWait)
brokers := []string{r.broker}

connect := func() {
connect := func() error {
var err error
r.producer, err = sarama.NewSyncProducer(brokers, r.saramaCfg)
if err == sarama.ErrOutOfBrokers {
Expand All @@ -121,12 +121,12 @@ func (r *KafkaMdm) run() {
} else {
log.Notice("kafkaMdm %q: now connected to kafka", r.key)
}
return err
}

// flushes the data to kafka and resets buffer. blocks until it succeeds
flush := func() {
for r.producer == nil {
connect()
for r.producer == nil && connect() != nil {
time.Sleep(time.Second)
}
for {
Expand Down

0 comments on commit a533123

Please sign in to comment.