From 330bf511957db829f9f586f136760680e6071139 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 19 Jan 2018 17:26:53 -0600 Subject: [PATCH] Added a buffer to sarama producer when sending log messages --- kafkahook/kafkahook.go | 47 +++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/kafkahook/kafkahook.go b/kafkahook/kafkahook.go index 40f61c9..0a66087 100644 --- a/kafkahook/kafkahook.go +++ b/kafkahook/kafkahook.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/Shopify/sarama" @@ -17,13 +18,19 @@ import ( "github.com/sirupsen/logrus" ) +const bufferSize = 150 + type KafkaHook struct { - producer sarama.AsyncProducer + produce chan []byte hostName string appName string - topic string + conf Config pid int debug bool + + // Sync stuff + wg sync.WaitGroup + once sync.Once } type Config struct { @@ -51,10 +58,28 @@ func New(conf Config) (*KafkaHook, error) { } h := KafkaHook{ - producer: conf.Producer, - topic: conf.Topic, + produce: make(chan []byte, bufferSize), + conf: conf, } + h.wg.Add(1) + go func() { + for { + select { + case buf, ok := <-h.produce: + if !ok { + h.wg.Done() + return + } + conf.Producer.Input() <- &sarama.ProducerMessage{ + Value: sarama.ByteEncoder(buf), + Topic: conf.Topic, + Key: nil, + } + } + } + }() + if h.hostName, err = os.Hostname(); err != nil { h.hostName = "unknown_host" } @@ -105,11 +130,7 @@ func (h *KafkaHook) Fire(entry *logrus.Entry) error { func (h *KafkaHook) sendKafka(buf []byte) error { select { - case h.producer.Input() <- &sarama.ProducerMessage{ - Value: sarama.ByteEncoder(buf), - Topic: h.topic, - Key: nil, - }: + case h.produce <- buf: default: // If the producer input channel buffer is full, then we better drop // a log record than block program execution. @@ -149,5 +170,11 @@ func (h *KafkaHook) SetDebug(set bool) { // Close the kakfa producer and flush any remaining logs func (h *KafkaHook) Close() error { - return h.producer.Close() + var err error + h.once.Do(func() { + close(h.produce) + h.wg.Wait() + err = h.conf.Producer.Close() + }) + return err }