Skip to content

Commit

Permalink
Merge pull request #14 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Added producer reconnect logic to kafkahook
  • Loading branch information
thrawn01 authored Oct 10, 2018
2 parents be52772 + 09d8c3a commit 062756c
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions kafkahook/kafkahook.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func New(conf Config) (*KafkaHook, error) {
kafkaConfig.Producer.Flush.Frequency = 200 * time.Millisecond
kafkaConfig.Producer.Retry.Backoff = 10 * time.Second
kafkaConfig.Producer.Retry.Max = 6
kafkaConfig.Producer.Return.Errors = true

// If the user failed to provide a producer create one
if conf.Producer == nil {
Expand All @@ -87,8 +88,15 @@ func New(conf Config) (*KafkaHook, error) {
go func() {
for {
select {
case err := <-conf.Producer.Errors():
msg, _ := err.Msg.Value.Encode()
fmt.Fprintf(os.Stderr, "[kafkahook] produce error '%s' for: %s\n", err.Err, string(msg))

case buf, ok := <-h.produce:
if !ok {
if err := conf.Producer.Close(); err != nil {
fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err)
}
h.wg.Done()
return
}
Expand Down Expand Up @@ -158,7 +166,7 @@ func (h *KafkaHook) sendKafka(buf []byte) error {
default:
// If the producer input channel buffer is full, then we better drop
// a log record than block program execution.
fmt.Fprintf(os.Stderr, "kafkahook buffer overflow: %s\n", string(buf))
fmt.Fprintf(os.Stderr, "[kafkahook] buffer overflow: %s\n", string(buf))
}
return nil
}
Expand Down Expand Up @@ -198,7 +206,6 @@ func (h *KafkaHook) Close() error {
h.once.Do(func() {
close(h.produce)
h.wg.Wait()
err = h.conf.Producer.Close()
})
return err
}
Expand Down

0 comments on commit 062756c

Please sign in to comment.