Skip to content

Commit

Permalink
introduce channel_workers
Browse files Browse the repository at this point in the history
  • Loading branch information
arkady-emelyanov committed Aug 16, 2018
1 parent 02d4d7f commit e00637c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 20 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ kafkabeat:
# @see https://github.com/Shopify/sarama/blob/v1.17.0/config.go#L262
# for detailed explanation
#channel_buffer_size: 256

# Number of concurrent publish workers.
# General suggestion keep number of workers equal or lower than CPU cores available.
# Defaults to number of available cores
#channel_workers: 8
```

### Timestamp
Expand Down
39 changes: 26 additions & 13 deletions beater/kafkabeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
)

type msgDecodeFn func(msg *sarama.ConsumerMessage) *beat.Event

Expand All @@ -39,11 +39,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
}

kConfig := cluster.NewConfig()
kConfig.Group.Return.Notifications = true
kConfig.Consumer.Return.Errors = true
kConfig.ClientID = bConfig.ClientID
kConfig.Consumer.MaxWaitTime = time.Millisecond * 500
kConfig.ChannelBufferSize = bConfig.ChannelBufferSize
kConfig.Consumer.MaxWaitTime = time.Millisecond * 500
kConfig.Consumer.Return.Errors = true

// initial offset handling
switch bConfig.Offset {
Expand Down Expand Up @@ -79,6 +78,10 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
return nil, fmt.Errorf("error in configuration, unknown publish_mode: '%s'", bConfig.PublishMode)
}

if bConfig.ChannelWorkers < 1 {
bConfig.ChannelWorkers = 1
}

// return beat
bt := &Kafkabeat{
done: make(chan struct{}),
Expand Down Expand Up @@ -115,6 +118,12 @@ func (bt *Kafkabeat) Run(b *beat.Beat) error {
return err
}

// run workers
bt.logger.Info("spawning channel workers: ", bt.bConfig.ChannelWorkers)
for i := 0; i < bt.bConfig.ChannelWorkers; i++ {
go bt.workerFn()
}

// run loop
bt.logger.Info("kafkabeat is running! Hit CTRL-C to stop it.")
for {
Expand All @@ -124,18 +133,22 @@ func (bt *Kafkabeat) Run(b *beat.Beat) error {
return nil

case err := <-bt.consumer.Errors():
bt.logger.Errorf("error: %#v", err)
bt.logger.Error(err.Error())
}
}
}

case notify := <-bt.consumer.Notifications():
bt.logger.Debugf("received notification: %#v", notify)
func (bt *Kafkabeat) workerFn() {
for {
msg := <-bt.consumer.Messages()
if msg == nil {
break
}

case msg := <-bt.consumer.Messages():
bt.logger.Debugf("received message: %#v", msg)
if event := bt.codec(msg); event != nil {
bt.pipeline.Publish(*event)
}
bt.consumer.MarkOffset(msg, "")
if event := bt.codec(msg); event != nil {
bt.pipeline.Publish(*event)
}
bt.consumer.MarkOffset(msg, "")
}
}

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package config

import "runtime"

type Config struct {
Brokers []string `config:"brokers"`
Topics []string `config:"topics"`
Expand All @@ -12,6 +14,7 @@ type Config struct {
Codec string `config:"codec"`
PublishMode string `config:"publish_mode"`
ChannelBufferSize int `config:"channel_buffer_size"`
ChannelWorkers int `config:"channel_workers"`
}

var DefaultConfig = Config{
Expand All @@ -23,4 +26,5 @@ var DefaultConfig = Config{
Codec: "json",
PublishMode: "default",
ChannelBufferSize: 256,
ChannelWorkers: runtime.NumCPU(),
}
3 changes: 0 additions & 3 deletions config/config_test.go

This file was deleted.

6 changes: 2 additions & 4 deletions kafkabeat.docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
################### Kafkabeat Configuration Example #########################

############################# Kafkabeat ######################################
kafkabeat:
brokers: ["localhost:9092"]
topics: ["watch"]
client_id: "beat"
group: "kafkabeat"
offset: "newest"
codec: "json"
codec: "plain"
channel_buffer_size: 1024
channel_workers: 2

output.elasticsearch:
enabled: true
Expand Down
5 changes: 5 additions & 0 deletions kafkabeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ kafkabeat:
# for detailed explanation
#channel_buffer_size: 256

# Number of concurrent publish workers.
# General suggestion keep number of workers equal or lower than CPU cores available.
# Defaults to number of available cores
#channel_workers: 1

#================================ General ======================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
11 changes: 11 additions & 0 deletions kafkabeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ kafkabeat:
# for detailed explanation.
#publish_mode: "default"

# Channel buffer size.
# Defaults to 256
# @see https://github.com/Shopify/sarama/blob/v1.17.0/config.go#L262
# for detailed explanation
#channel_buffer_size: 256

# Number of concurrent publish workers.
# General suggestion keep number of workers equal or lower than CPU cores available.
# Defaults to number of available cores
#channel_workers: 1

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down

0 comments on commit e00637c

Please sign in to comment.