From f83042b244e9a3a412265f59b5918d1e24072c8f Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 4 Oct 2018 15:32:10 -0500 Subject: [PATCH 1/2] Added producer reconnect logic to kafkahook --- kafkahook/kafkahook.go | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/kafkahook/kafkahook.go b/kafkahook/kafkahook.go index c573d35..ca9ca64 100644 --- a/kafkahook/kafkahook.go +++ b/kafkahook/kafkahook.go @@ -69,6 +69,7 @@ func New(conf Config) (*KafkaHook, error) { kafkaConfig.Producer.Flush.Frequency = 200 * time.Millisecond kafkaConfig.Producer.Retry.Backoff = 10 * time.Second kafkaConfig.Producer.Retry.Max = 6 + kafkaConfig.Producer.Return.Errors = true // If the user failed to provide a producer create one if conf.Producer == nil { @@ -87,9 +88,47 @@ func New(conf Config) (*KafkaHook, error) { go func() { for { select { + case prodErr := <-conf.Producer.Errors(): + fmt.Fprintf(os.Stderr, "[kafkahook] producer error '%s'; reconnecting producer...\n", prodErr.Err) + + // Close any previously open producers + if conf.Producer != nil { + // Since `Producer.Return.Errors` is true, `Producer.Close()` will send a shutdown error which + // could cause `Close()` to block unless we read it. Also there could be other errors + // that are waiting to be received and must be sent before shutdown is complete. + go func() { + for range conf.Producer.Errors() {} + }() + + conf.Producer.Close() + conf.Producer = nil + } + + var backOff int + for { + // Attempt to reconnect to the brokers + conf.Producer, err = sarama.NewAsyncProducer(conf.Endpoints, kafkaConfig) + if err != nil { + if backOff < 6 { + backOff++ + } + fmt.Fprintf(os.Stderr, "[kafkahook] reconnect error: %s; sleeping (%d)...\n", err, backOff) + time.Sleep(time.Duration(backOff) * time.Second) + continue + } + backOff = 0 + break + } case buf, ok := <-h.produce: if !ok { h.wg.Done() + // See comment above + go func() { + for range conf.Producer.Errors() {} + }() + if err := conf.Producer.Close(); err != nil { + fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err) + } return } conf.Producer.Input() <- &sarama.ProducerMessage{ @@ -158,7 +197,7 @@ func (h *KafkaHook) sendKafka(buf []byte) error { default: // If the producer input channel buffer is full, then we better drop // a log record than block program execution. - fmt.Fprintf(os.Stderr, "kafkahook buffer overflow: %s\n", string(buf)) + fmt.Fprintf(os.Stderr, "[kafkahook] buffer overflow: %s\n", string(buf)) } return nil } @@ -198,7 +237,6 @@ func (h *KafkaHook) Close() error { h.once.Do(func() { close(h.produce) h.wg.Wait() - err = h.conf.Producer.Close() }) return err } From 09d8c3ad9cd4ee9cbb56e687c2f1b54a7e8c9d90 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 5 Oct 2018 11:49:41 -0500 Subject: [PATCH 2/2] drop producer reconnect logic --- kafkahook/kafkahook.go | 39 ++++----------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/kafkahook/kafkahook.go b/kafkahook/kafkahook.go index ca9ca64..33b4728 100644 --- a/kafkahook/kafkahook.go +++ b/kafkahook/kafkahook.go @@ -88,47 +88,16 @@ func New(conf Config) (*KafkaHook, error) { go func() { for { select { - case prodErr := <-conf.Producer.Errors(): - fmt.Fprintf(os.Stderr, "[kafkahook] producer error '%s'; reconnecting producer...\n", prodErr.Err) - - // Close any previously open producers - if conf.Producer != nil { - // Since `Producer.Return.Errors` is true, `Producer.Close()` will send a shutdown error which - // could cause `Close()` to block unless we read it. Also there could be other errors - // that are waiting to be received and must be sent before shutdown is complete. - go func() { - for range conf.Producer.Errors() {} - }() - - conf.Producer.Close() - conf.Producer = nil - } + case err := <-conf.Producer.Errors(): + msg, _ := err.Msg.Value.Encode() + fmt.Fprintf(os.Stderr, "[kafkahook] produce error '%s' for: %s\n", err.Err, string(msg)) - var backOff int - for { - // Attempt to reconnect to the brokers - conf.Producer, err = sarama.NewAsyncProducer(conf.Endpoints, kafkaConfig) - if err != nil { - if backOff < 6 { - backOff++ - } - fmt.Fprintf(os.Stderr, "[kafkahook] reconnect error: %s; sleeping (%d)...\n", err, backOff) - time.Sleep(time.Duration(backOff) * time.Second) - continue - } - backOff = 0 - break - } case buf, ok := <-h.produce: if !ok { - h.wg.Done() - // See comment above - go func() { - for range conf.Producer.Errors() {} - }() if err := conf.Producer.Close(); err != nil { fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err) } + h.wg.Done() return } conf.Producer.Input() <- &sarama.ProducerMessage{