-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.go
100 lines (92 loc) · 2.35 KB
/
example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/obgnail/mysql-river/handler/elasticsearch"
"github.com/obgnail/mysql-river/handler/kafka"
"github.com/obgnail/mysql-river/handler/trace_log"
"github.com/obgnail/mysql-river/river"
"time"
)
var config = &river.Config{
MySQLConfig: &river.MySQLConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "root",
},
PosAutoSaverConfig: &river.PosAutoSaverConfig{
SaveDir: "./",
SaveInterval: 3 * time.Second,
},
HealthCheckerConfig: &river.HealthCheckerConfig{
CheckPosThreshold: 3000,
CheckInterval: 5 * time.Second,
},
}
func PanicIfError(err error) {
if err != nil {
panic(err)
}
}
func Kafka() {
kafkaConfig := &kafka.Config{
Addrs: []string{"127.0.0.1:9092"},
Topic: "binlog",
OffsetStoreDir: "./",
Offset: nil,
UseOldestOffset: false,
}
handler, err := kafka.New(kafkaConfig)
PanicIfError(err)
go handler.Consume(func(msg *sarama.ConsumerMessage) error {
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n",
msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
return nil
})
err = handler.Pipe(river.New(config), river.FromFile)
PanicIfError(err)
}
func TraceLog() {
traceConfig := &trace_log.Config{
DBs: []string{"testdb01"},
EntireFields: false,
ShowTxMsg: true,
Highlight: true,
}
handler := trace_log.New(traceConfig)
err := river.New(config).SetHandler(handler).Sync(river.FromDB) // 从最新位置开始解析
PanicIfError(err)
}
func ElasticSearch() {
handlerConfig := &elasticsearch.EsHandlerConfig{
Host: "127.0.0.1",
Port: 9200,
User: "",
Password: "",
BulkSize: 128,
FlushInterval: time.Second,
SkipNoPkTable: true,
Rules: []*elasticsearch.Rule{
elasticsearch.NewDefaultRule("testdb01", "user"),
},
}
handler := elasticsearch.New(handlerConfig)
err := river.New(config).SetHandler(handler).Sync(river.FromDB) // 从最新位置开始解析
PanicIfError(err)
}
func Base() {
err := river.New(config).
SetHandler(river.NopCloserAlerter(func(event *river.EventData) error {
fmt.Println(event.EventType, event.LogName, event.LogPos, event.Before, event.After)
return nil
})).
Sync(river.FromDB)
PanicIfError(err)
}
func main() {
//Base()
//TraceLog()
Kafka()
//ElasticSearch()
}