-
Notifications
You must be signed in to change notification settings - Fork 7
/
kafka_test.go
105 lines (86 loc) · 2.06 KB
/
kafka_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package anyq
import (
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
"testing"
)
func TestKafkaSetZookeeper(t *testing.T) {
q, err := New("kafka", "localhost:9092")
if err != nil {
t.Error(err)
}
_, err = q.NewConsumer(KafkaConsumerArgs{Topic: "test", Partitions: "all", Group: "group1"})
if err == nil {
t.Error("zookeeper is required for group consumer")
}
}
func TestKafkaSingleConsumer(t *testing.T) {
q, err := New("kafka", "localhost:9092")
if err != nil {
t.Error(err)
}
c, err := q.NewConsumer(KafkaConsumerArgs{Topic: "test", Partitions: "all"})
if err != nil {
t.Error(err)
}
consumer, err := c.Consumer()
if err != nil {
t.Error(err)
}
if _, ok := consumer.(sarama.Consumer); !ok {
t.Errorf("invalid consumer type(%T)\n", consumer)
}
}
func TestKafkaGroupConsumer(t *testing.T) {
q, err := New("kafka", "localhost:9092", func(q *Kafka) {
q.Zookeepers = []string{"localhost:2181"}
})
if err != nil {
t.Error(err)
}
c, err := q.NewConsumer(KafkaConsumerArgs{Topic: "test", Partitions: "all", Group: "group1"})
if err != nil {
t.Error(err)
}
consumer, err := c.Consumer()
if err != nil {
t.Error(err)
}
if _, ok := consumer.(*consumergroup.ConsumerGroup); !ok {
t.Errorf("invalid consumer type(%T)\n", consumer)
}
}
func TestKafkaSyncProducer(t *testing.T) {
q, err := New("kafka", "localhost:9092")
if err != nil {
t.Error(err)
}
p, err := q.NewProducer(KafkaProducerArgs{Topic: "test", Sync: true})
if err != nil {
t.Error(err)
}
producer, err := p.Producer()
if err != nil {
t.Error(err)
}
if _, ok := producer.(sarama.SyncProducer); !ok {
t.Errorf("invalid producer type(%T)\n", producer)
}
}
func TestKafkaAsyncProducer(t *testing.T) {
q, err := New("kafka", "localhost:9092")
if err != nil {
t.Error(err)
}
p, err := q.NewProducer(KafkaProducerArgs{Topic: "test"})
if err != nil {
t.Error(err)
}
producer, err := p.Producer()
if err != nil {
t.Error(err)
}
if _, ok := producer.(sarama.AsyncProducer); !ok {
t.Errorf("invalid producer type(%T)\n", producer)
}
}