diff --git a/kafkahook/kafkahook.go b/kafkahook/kafkahook.go index c573d35..33b4728 100644 --- a/kafkahook/kafkahook.go +++ b/kafkahook/kafkahook.go @@ -69,6 +69,7 @@ func New(conf Config) (*KafkaHook, error) { kafkaConfig.Producer.Flush.Frequency = 200 * time.Millisecond kafkaConfig.Producer.Retry.Backoff = 10 * time.Second kafkaConfig.Producer.Retry.Max = 6 + kafkaConfig.Producer.Return.Errors = true // If the user failed to provide a producer create one if conf.Producer == nil { @@ -87,8 +88,15 @@ func New(conf Config) (*KafkaHook, error) { go func() { for { select { + case err := <-conf.Producer.Errors(): + msg, _ := err.Msg.Value.Encode() + fmt.Fprintf(os.Stderr, "[kafkahook] produce error '%s' for: %s\n", err.Err, string(msg)) + case buf, ok := <-h.produce: if !ok { + if err := conf.Producer.Close(); err != nil { + fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err) + } h.wg.Done() return } @@ -158,7 +166,7 @@ func (h *KafkaHook) sendKafka(buf []byte) error { default: // If the producer input channel buffer is full, then we better drop // a log record than block program execution. - fmt.Fprintf(os.Stderr, "kafkahook buffer overflow: %s\n", string(buf)) + fmt.Fprintf(os.Stderr, "[kafkahook] buffer overflow: %s\n", string(buf)) } return nil } @@ -198,7 +206,6 @@ func (h *KafkaHook) Close() error { h.once.Do(func() { close(h.produce) h.wg.Wait() - err = h.conf.Producer.Close() }) return err }