diff --git a/README.md b/README.md index 3086f4fd..f8874e76 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,11 @@ kafkabeat: # @see https://github.com/Shopify/sarama/blob/v1.17.0/config.go#L262 # for detailed explanation #channel_buffer_size: 256 + + # Number of concurrent publish workers. + # General suggestion keep number of workers equal or lower than CPU cores available. + # Defaults to number of available cores + #channel_workers: 8 ``` ### Timestamp diff --git a/beater/kafkabeat.go b/beater/kafkabeat.go index 72acd21d..19f705b9 100644 --- a/beater/kafkabeat.go +++ b/beater/kafkabeat.go @@ -13,7 +13,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" -) + ) type msgDecodeFn func(msg *sarama.ConsumerMessage) *beat.Event @@ -39,11 +39,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { } kConfig := cluster.NewConfig() - kConfig.Group.Return.Notifications = true - kConfig.Consumer.Return.Errors = true kConfig.ClientID = bConfig.ClientID - kConfig.Consumer.MaxWaitTime = time.Millisecond * 500 kConfig.ChannelBufferSize = bConfig.ChannelBufferSize + kConfig.Consumer.MaxWaitTime = time.Millisecond * 500 + kConfig.Consumer.Return.Errors = true // initial offset handling switch bConfig.Offset { @@ -79,6 +78,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return nil, fmt.Errorf("error in configuration, unknown publish_mode: '%s'", bConfig.PublishMode) } + if bConfig.ChannelWorkers < 1 { + bConfig.ChannelWorkers = 1 + } + // return beat bt := &Kafkabeat{ done: make(chan struct{}), @@ -115,6 +118,12 @@ func (bt *Kafkabeat) Run(b *beat.Beat) error { return err } + // run workers + bt.logger.Info("spawning channel workers: ", bt.bConfig.ChannelWorkers) + for i := 0; i < bt.bConfig.ChannelWorkers; i++ { + go bt.workerFn() + } + // run loop bt.logger.Info("kafkabeat is running! Hit CTRL-C to stop it.") for { @@ -124,18 +133,22 @@ func (bt *Kafkabeat) Run(b *beat.Beat) error { return nil case err := <-bt.consumer.Errors(): - bt.logger.Errorf("error: %#v", err) + bt.logger.Error(err.Error()) + } + } +} - case notify := <-bt.consumer.Notifications(): - bt.logger.Debugf("received notification: %#v", notify) +func (bt *Kafkabeat) workerFn() { + for { + msg := <-bt.consumer.Messages() + if msg == nil { + break + } - case msg := <-bt.consumer.Messages(): - bt.logger.Debugf("received message: %#v", msg) - if event := bt.codec(msg); event != nil { - bt.pipeline.Publish(*event) - } - bt.consumer.MarkOffset(msg, "") + if event := bt.codec(msg); event != nil { + bt.pipeline.Publish(*event) } + bt.consumer.MarkOffset(msg, "") } } diff --git a/config/config.go b/config/config.go index bf0154d0..17ac8a40 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,8 @@ package config +import "runtime" + type Config struct { Brokers []string `config:"brokers"` Topics []string `config:"topics"` @@ -12,6 +14,7 @@ type Config struct { Codec string `config:"codec"` PublishMode string `config:"publish_mode"` ChannelBufferSize int `config:"channel_buffer_size"` + ChannelWorkers int `config:"channel_workers"` } var DefaultConfig = Config{ @@ -23,4 +26,5 @@ var DefaultConfig = Config{ Codec: "json", PublishMode: "default", ChannelBufferSize: 256, + ChannelWorkers: runtime.NumCPU(), } diff --git a/config/config_test.go b/config/config_test.go deleted file mode 100644 index d177de3a..00000000 --- a/config/config_test.go +++ /dev/null @@ -1,3 +0,0 @@ -// +build !integration - -package config diff --git a/kafkabeat.docker-compose.yml b/kafkabeat.docker-compose.yml index 6b5d5c49..eb193071 100644 --- a/kafkabeat.docker-compose.yml +++ b/kafkabeat.docker-compose.yml @@ -1,14 +1,12 @@ -################### Kafkabeat Configuration Example ######################### - -############################# Kafkabeat ###################################### kafkabeat: brokers: ["localhost:9092"] topics: ["watch"] client_id: "beat" group: "kafkabeat" offset: "newest" - codec: "json" + codec: "plain" channel_buffer_size: 1024 + channel_workers: 2 output.elasticsearch: enabled: true diff --git a/kafkabeat.reference.yml b/kafkabeat.reference.yml index c6a73e36..6352282f 100644 --- a/kafkabeat.reference.yml +++ b/kafkabeat.reference.yml @@ -37,6 +37,11 @@ kafkabeat: # for detailed explanation #channel_buffer_size: 256 + # Number of concurrent publish workers. + # General suggestion keep number of workers equal or lower than CPU cores available. + # Defaults to number of available cores + #channel_workers: 1 + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/kafkabeat.yml b/kafkabeat.yml index bf51b0be..816af11a 100644 --- a/kafkabeat.yml +++ b/kafkabeat.yml @@ -31,6 +31,17 @@ kafkabeat: # for detailed explanation. #publish_mode: "default" + # Channel buffer size. + # Defaults to 256 + # @see https://github.com/Shopify/sarama/blob/v1.17.0/config.go#L262 + # for detailed explanation + #channel_buffer_size: 256 + + # Number of concurrent publish workers. + # General suggestion keep number of workers equal or lower than CPU cores available. + # Defaults to number of available cores + #channel_workers: 1 + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group