-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.go
123 lines (107 loc) · 3.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"io"
"os"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/asaskevich/govalidator"
"github.com/segment-sources/mongodb/lib"
"github.com/segmentio/objects-go"
"github.com/segmentio/ecs-logs-go/logrus"
"github.com/tj/docopt"
)
const (
Version = "v0.1.5-beta"
Usage = `
Usage:
mongodb
[--debug]
[--init]
[--json-log]
[--concurrency=<c>]
[--schema=<schema-path>]
[--write-key=<segment-write-key>]
--hostname=<hostname>
--port=<port>
--username=<username>
--password=<password>
--database=<database>
mongodb -h | --help
mongodb --version
Options:
"github.com/segmentio/source-db-lib/internal/domain"
-h --help Show this screen
--version Show version
[--debug] Set logrus level to .DebugLevel
[--json-log] Format log as JSON. Useful for ecs-logs for example
--write-key=<key> Segment source write key
--concurrency=<c> Number of concurrent table scans [default: 1]
--hostname=<hostname> Database instance hostname
--port=<port> Database instance port number
--username=<username> Database instance username
--password=<password> Database instance password
--database=<database> Database instance name
--schema=<schema-path> The path to the schema json file [default: schema.json]
`
)
func main() {
m, err := docopt.Parse(Usage, nil, true, Version, false)
if err != nil {
logrus.Fatal(err)
}
if m["--debug"].(bool) {
logrus.SetLevel(logrus.DebugLevel)
}
if m["--json-log"].(bool) {
logrus.SetFormatter(logrus_ecslogs.NewFormatter())
}
concurrency, err := strconv.Atoi(m["--concurrency"].(string))
if err != nil {
logrus.Fatal(err)
}
// Load and validate DB configuration.
config := &mongodb.Config{
Init: m["--init"].(bool),
Hostname: m["--hostname"].(string),
Port: m["--port"].(string),
Username: m["--username"].(string),
Password: m["--password"].(string),
Database: m["--database"].(string),
}
_, err = govalidator.ValidateStruct(config)
if err != nil {
logrus.Error(err)
return
}
// If in init mode, save list of collections to schema file. Users will then have to modify the
// file and fill in fields they want to export to their Segment warehouse.
fileName := m["--schema"].(string)
if config.Init {
mongodb.InitSchema(config, fileName)
return
}
// Build Segment client and define publish function for when we scan over the collections.
writeKey := m["--write-key"].(string)
if writeKey == "" {
logrus.Fatal("Write key is required when not in init mode.")
}
description, err := mongodb.ParseSchema(fileName)
if err == io.EOF {
logrus.Error("Empty schema, did you run `--init`?")
} else if err != nil {
logrus.Fatal("Unable to parse schema", err)
}
segmentClient := objects.New(writeKey)
defer segmentClient.Close()
setWrapperFunc := func(o *objects.Object) {
err := segmentClient.Set(o)
if err != nil {
logrus.WithFields(logrus.Fields{"id": o.ID, "collection": o.Collection, "properties": o.Properties}).Warn(err)
}
}
logrus.Info("[%v] Mongo source started with writeKey ", Version, writeKey)
if err := mongodb.Run(config, description, concurrency, setWrapperFunc); err != nil {
logrus.Error("mongodb source failed to complete", err)
os.Exit(1)
}
}