-
Notifications
You must be signed in to change notification settings - Fork 3
/
kafka_consumer_collection.go
124 lines (107 loc) · 2.88 KB
/
kafka_consumer_collection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package consumer
import (
"context"
"errors"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/inviqa/kafka-consumer-go/config"
"github.com/inviqa/kafka-consumer-go/data/failure/model"
"github.com/inviqa/kafka-consumer-go/log"
)
type failureProducer interface {
listenForFailures(ctx context.Context, wg *sync.WaitGroup)
}
type kafkaConsumerCollection struct {
cfg *config.Config
consumers []sarama.ConsumerGroup
producer failureProducer
handler sarama.ConsumerGroupHandler
saramaCfg *sarama.Config
logger log.Logger
connectToKafka kafkaConnector
}
func newKafkaConsumerCollection(
cfg *config.Config,
p failureProducer,
fch chan model.Failure,
hm HandlerMap,
scfg *sarama.Config,
logger log.Logger,
connector kafkaConnector,
) *kafkaConsumerCollection {
if logger == nil {
logger = log.NullLogger{}
}
return &kafkaConsumerCollection{
cfg: cfg,
consumers: []sarama.ConsumerGroup{},
producer: p,
handler: newConsumer(fch, cfg, hm, logger),
saramaCfg: scfg,
logger: logger,
connectToKafka: connector,
}
}
func (cc *kafkaConsumerCollection) start(ctx context.Context, wg *sync.WaitGroup) error {
topics := cc.cfg.ConsumableTopics
if topics == nil {
return errors.New("no Kafka topics are configured, therefore cannot start consumers")
}
for _, t := range topics {
group, err := cc.startConsumerGroup(ctx, wg, t)
if err != nil {
return err
}
cc.consumers = append(cc.consumers, group)
}
cc.producer.listenForFailures(ctx, wg)
return nil
}
func (cc *kafkaConsumerCollection) close() {
for _, c := range cc.consumers {
if err := c.Close(); err != nil {
cc.logger.Errorf("error occurred closing a Kafka consumer: %w", err)
}
}
cc.consumers = []sarama.ConsumerGroup{}
}
func (cc *kafkaConsumerCollection) startConsumerGroup(ctx context.Context, wg *sync.WaitGroup, topic *config.KafkaTopic) (sarama.ConsumerGroup, error) {
cc.logger.Infof("starting Kafka consumer group for '%s'", topic.Name)
cl, err := cc.connectToKafka(cc.cfg, cc.saramaCfg, cc.logger)
if err != nil {
return nil, err
}
cc.startConsumer(cl, ctx, wg, topic)
return cl, nil
}
func (cc *kafkaConsumerCollection) startConsumer(cl sarama.ConsumerGroup, ctx context.Context, wg *sync.WaitGroup, topic *config.KafkaTopic) {
go func() {
for err := range cl.Errors() {
cc.logger.Errorf("error occurred in consumer group Handler: %w", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
timer := time.NewTimer(topic.Delay)
for {
select {
case <-timer.C:
if err := cl.Consume(ctx, []string{topic.Name}, cc.handler); err != nil {
cc.logger.Errorf("error when consuming from Kafka: %s", err)
}
if ctx.Err() != nil {
timer.Stop()
return
}
timer.Reset(topic.Delay)
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}()
}