-
Notifications
You must be signed in to change notification settings - Fork 6
/
kafka.go
68 lines (53 loc) · 1.28 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"os"
)
var (
brokers = []string{"127.0.0.1:9092"}
topic = "xbanku-transactions-t3"
)
func newKafkaConfiguration() *sarama.Config {
conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll
conf.Producer.Return.Successes = true
conf.ChannelBufferSize = 1
conf.Version = sarama.V0_10_1_0
return conf
}
func newKafkaSyncProducer() sarama.SyncProducer {
kafka, err := sarama.NewSyncProducer(brokers, newKafkaConfiguration())
if err != nil {
fmt.Printf("Kafka error: %s\n", err)
os.Exit(-1)
}
return kafka
}
func newKafkaConsumer() sarama.Consumer {
consumer, err := sarama.NewConsumer(brokers, newKafkaConfiguration())
if err != nil {
fmt.Printf("Kafka error: %s\n", err)
os.Exit(-1)
}
return consumer
}
func sendMsg(kafka sarama.SyncProducer, event interface{}) error {
json, err := json.Marshal(event)
if err != nil {
return err
}
msgLog := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(string(json)),
}
partition, offset, err := kafka.SendMessage(msgLog)
if err != nil {
fmt.Printf("Kafka error: %s\n", err)
}
fmt.Printf("Message: %+v\n", event)
fmt.Printf("Message is stored in partition %d, offset %d\n",
partition, offset)
return nil
}