Skip to content

Commit

Permalink
Merge pull request #11 from mailgun/improve-buffer-overflow
Browse files Browse the repository at this point in the history
Added a buffer to sarama producer when sending log messages
  • Loading branch information
thrawn01 authored Jan 23, 2018
2 parents e1f5c6d + 330bf51 commit 901b4e6
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions kafkahook/kafkahook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
Expand All @@ -17,13 +18,19 @@ import (
"github.com/sirupsen/logrus"
)

const bufferSize = 150

type KafkaHook struct {
producer sarama.AsyncProducer
produce chan []byte
hostName string
appName string
topic string
conf Config
pid int
debug bool

// Sync stuff
wg sync.WaitGroup
once sync.Once
}

type Config struct {
Expand Down Expand Up @@ -51,10 +58,28 @@ func New(conf Config) (*KafkaHook, error) {
}

h := KafkaHook{
producer: conf.Producer,
topic: conf.Topic,
produce: make(chan []byte, bufferSize),
conf: conf,
}

h.wg.Add(1)
go func() {
for {
select {
case buf, ok := <-h.produce:
if !ok {
h.wg.Done()
return
}
conf.Producer.Input() <- &sarama.ProducerMessage{
Value: sarama.ByteEncoder(buf),
Topic: conf.Topic,
Key: nil,
}
}
}
}()

if h.hostName, err = os.Hostname(); err != nil {
h.hostName = "unknown_host"
}
Expand Down Expand Up @@ -105,11 +130,7 @@ func (h *KafkaHook) Fire(entry *logrus.Entry) error {

func (h *KafkaHook) sendKafka(buf []byte) error {
select {
case h.producer.Input() <- &sarama.ProducerMessage{
Value: sarama.ByteEncoder(buf),
Topic: h.topic,
Key: nil,
}:
case h.produce <- buf:
default:
// If the producer input channel buffer is full, then we better drop
// a log record than block program execution.
Expand Down Expand Up @@ -149,5 +170,11 @@ func (h *KafkaHook) SetDebug(set bool) {

// Close the kakfa producer and flush any remaining logs
func (h *KafkaHook) Close() error {
return h.producer.Close()
var err error
h.once.Do(func() {
close(h.produce)
h.wg.Wait()
err = h.conf.Producer.Close()
})
return err
}

0 comments on commit 901b4e6

Please sign in to comment.