Skip to content

Commit dec628b

Browse files
marctcrfratto
authored andcommitted
Refactor collectors to not depend of kingpin to be configured.
This commit removes references to kingpin.CommandLine, allowing for the collector package to be used and configured with a custom kingpin (or no kingpin at all). The configuration for collectors has been moved to struct fields, which the kingpin flags populate at flag parse time. Co-authored-by: Robert Fratto <[email protected]> Signed-off-by: Marc Tuduri <[email protected]>
1 parent a1503cf commit dec628b

16 files changed

+246
-171
lines changed

Diff for: collector/exporter.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package collector
1616
import (
1717
"context"
1818
"fmt"
19+
"regexp"
1920
"strings"
2021
"sync"
2122
"time"
@@ -43,15 +44,26 @@ const (
4344

4445
// Tunable flags.
4546
var (
46-
exporterLockTimeout = kingpin.Flag(
47+
versionRE = regexp.MustCompile(`^\d+\.\d+`)
48+
)
49+
50+
// Config holds configuration options for the exporter.
51+
type Config struct {
52+
LockTimeout int
53+
SlowLogFilter bool
54+
}
55+
56+
// RegisterFlags adds flags to configure the exporter.
57+
func (c *Config) RegisterFlags(application *kingpin.Application) {
58+
application.Flag(
4759
"exporter.lock_wait_timeout",
4860
"Set a lock_wait_timeout (in seconds) on the connection to avoid long metadata locking.",
49-
).Default("2").Int()
50-
slowLogFilter = kingpin.Flag(
61+
).Default("2").IntVar(&c.LockTimeout)
62+
application.Flag(
5163
"exporter.log_slow_filter",
5264
"Add a log_slow_filter to avoid slow query logging of scrapes. NOTE: Not supported by Oracle MySQL.",
53-
).Default("false").Bool()
54-
)
65+
).Default("false").BoolVar(&c.SlowLogFilter)
66+
}
5567

5668
// metric definition
5769
var (
@@ -87,11 +99,11 @@ type Exporter struct {
8799
}
88100

89101
// New returns a new MySQL exporter for the provided DSN.
90-
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger) *Exporter {
102+
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger, cfg Config) *Exporter {
91103
// Setup extra params for the DSN, default to having a lock timeout.
92-
dsnParams := []string{fmt.Sprintf(timeoutParam, *exporterLockTimeout)}
104+
dsnParams := []string{fmt.Sprintf(timeoutParam, cfg.LockTimeout)}
93105

94-
if *slowLogFilter {
106+
if cfg.SlowLogFilter {
95107
dsnParams = append(dsnParams, sessionSettingsParam)
96108
}
97109

Diff for: collector/exporter_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"testing"
1919

20+
"github.com/alecthomas/kingpin/v2"
2021
"github.com/go-kit/log"
2122
"github.com/prometheus/client_golang/prometheus"
2223
"github.com/prometheus/common/model"
@@ -30,13 +31,22 @@ func TestExporter(t *testing.T) {
3031
t.Skip("-short is passed, skipping test")
3132
}
3233

34+
var exporterConfig Config
35+
kingpinApp := kingpin.New("TestExporter", "")
36+
exporterConfig.RegisterFlags(kingpinApp)
37+
_, err := kingpinApp.Parse([]string{})
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
3342
exporter := New(
3443
context.Background(),
3544
dsn,
3645
[]Scraper{
3746
ScrapeGlobalStatus{},
3847
},
3948
log.NewNopLogger(),
49+
exporterConfig,
4050
)
4151

4252
convey.Convey("Metrics describing", t, func() {

Diff for: collector/heartbeat.go

+25-20
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,6 @@ const (
3636
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
3737
)
3838

39-
var (
40-
collectHeartbeatDatabase = kingpin.Flag(
41-
"collect.heartbeat.database",
42-
"Database from where to collect heartbeat data",
43-
).Default("heartbeat").String()
44-
collectHeartbeatTable = kingpin.Flag(
45-
"collect.heartbeat.table",
46-
"Table from where to collect heartbeat data",
47-
).Default("heartbeat").String()
48-
collectHeartbeatUtc = kingpin.Flag(
49-
"collect.heartbeat.utc",
50-
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
51-
).Bool()
52-
)
53-
5439
// Metric descriptors.
5540
var (
5641
HeartbeatStoredDesc = prometheus.NewDesc(
@@ -74,7 +59,11 @@ var (
7459
// server_id int unsigned NOT NULL PRIMARY KEY,
7560
//
7661
// );
77-
type ScrapeHeartbeat struct{}
62+
type ScrapeHeartbeat struct {
63+
Database string
64+
Table string
65+
UTC bool
66+
}
7867

7968
// Name of the Scraper. Should be unique.
8069
func (ScrapeHeartbeat) Name() string {
@@ -91,18 +80,34 @@ func (ScrapeHeartbeat) Version() float64 {
9180
return 5.1
9281
}
9382

83+
// RegisterFlags adds flags to configure the Scraper.
84+
func (s *ScrapeHeartbeat) RegisterFlags(application *kingpin.Application) {
85+
application.Flag(
86+
"collect.heartbeat.database",
87+
"Database from where to collect heartbeat data",
88+
).Default("heartbeat").StringVar(&s.Database)
89+
application.Flag(
90+
"collect.heartbeat.table",
91+
"Table from where to collect heartbeat data",
92+
).Default("heartbeat").StringVar(&s.Table)
93+
application.Flag(
94+
"collect.heartbeat.utc",
95+
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
96+
).BoolVar(&s.UTC)
97+
}
98+
9499
// nowExpr returns a current timestamp expression.
95-
func nowExpr() string {
96-
if *collectHeartbeatUtc {
100+
func (s ScrapeHeartbeat) nowExpr() string {
101+
if s.UTC {
97102
return "UTC_TIMESTAMP(6)"
98103
}
99104
return "NOW(6)"
100105
}
101106

102107
// Scrape collects data from database connection and sends it over channel as prometheus metric.
103-
func (ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
108+
func (s ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
104109
db := instance.getDB()
105-
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
110+
query := fmt.Sprintf(heartbeatQuery, s.nowExpr(), s.Database, s.Table)
106111
heartbeatRows, err := db.QueryContext(ctx, query)
107112
if err != nil {
108113
return err

Diff for: collector/heartbeat_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
5555
func TestScrapeHeartbeat(t *testing.T) {
5656
for _, tt := range ScrapeHeartbeatTestCases {
5757
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
58-
_, err := kingpin.CommandLine.Parse(tt.Args)
58+
scraper := ScrapeHeartbeat{}
59+
60+
app := kingpin.New("TestScrapeHeartbeat", "")
61+
scraper.RegisterFlags(app)
62+
63+
_, err := app.Parse(tt.Args)
5964
if err != nil {
6065
t.Fatal(err)
6166
}
@@ -73,7 +78,7 @@ func TestScrapeHeartbeat(t *testing.T) {
7378

7479
ch := make(chan prometheus.Metric)
7580
go func() {
76-
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
81+
if err = scraper.Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
7782
t.Errorf("error calling function on test: %s", err)
7883
}
7984
close(ch)

Diff for: collector/info_schema_processlist.go

+26-21
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,6 @@ const infoSchemaProcesslistQuery = `
4141
GROUP BY user, host, command, state
4242
`
4343

44-
// Tunable flags.
45-
var (
46-
processlistMinTime = kingpin.Flag(
47-
"collect.info_schema.processlist.min_time",
48-
"Minimum time a thread must be in each state to be counted",
49-
).Default("0").Int()
50-
processesByUserFlag = kingpin.Flag(
51-
"collect.info_schema.processlist.processes_by_user",
52-
"Enable collecting the number of processes by user",
53-
).Default("true").Bool()
54-
processesByHostFlag = kingpin.Flag(
55-
"collect.info_schema.processlist.processes_by_host",
56-
"Enable collecting the number of processes by host",
57-
).Default("true").Bool()
58-
)
59-
6044
// Metric descriptors.
6145
var (
6246
processlistCountDesc = prometheus.NewDesc(
@@ -78,7 +62,11 @@ var (
7862
)
7963

8064
// ScrapeProcesslist collects from `information_schema.processlist`.
81-
type ScrapeProcesslist struct{}
65+
type ScrapeProcesslist struct {
66+
ProcessListMinTime int
67+
ProcessesByUserFlag bool
68+
ProcessesByHostFlag bool
69+
}
8270

8371
// Name of the Scraper. Should be unique.
8472
func (ScrapeProcesslist) Name() string {
@@ -95,11 +83,27 @@ func (ScrapeProcesslist) Version() float64 {
9583
return 5.1
9684
}
9785

86+
// RegisterFlags adds flags to configure the Scraper.
87+
func (s *ScrapeProcesslist) RegisterFlags(application *kingpin.Application) {
88+
application.Flag(
89+
"collect.info_schema.processlist.min_time",
90+
"Minimum time a thread must be in each state to be counted",
91+
).Default("0").IntVar(&s.ProcessListMinTime)
92+
application.Flag(
93+
"collect.info_schema.processlist.processes_by_user",
94+
"Enable collecting the number of processes by user",
95+
).Default("true").BoolVar(&s.ProcessesByUserFlag)
96+
application.Flag(
97+
"collect.info_schema.processlist.processes_by_host",
98+
"Enable collecting the number of processes by host",
99+
).Default("true").BoolVar(&s.ProcessesByHostFlag)
100+
}
101+
98102
// Scrape collects data from database connection and sends it over channel as prometheus metric.
99-
func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
103+
func (s ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
100104
processQuery := fmt.Sprintf(
101105
infoSchemaProcesslistQuery,
102-
*processlistMinTime,
106+
s.ProcessListMinTime,
103107
)
104108
db := instance.getDB()
105109
processlistRows, err := db.QueryContext(ctx, processQuery)
@@ -162,12 +166,13 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, instance *instance, ch chan
162166
}
163167
}
164168

165-
if *processesByHostFlag {
169+
if s.ProcessesByHostFlag {
166170
for _, host := range sortedMapKeys(stateHostCounts) {
167171
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
168172
}
169173
}
170-
if *processesByUserFlag {
174+
175+
if s.ProcessesByUserFlag {
171176
for _, user := range sortedMapKeys(stateUserCounts) {
172177
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
173178
}

Diff for: collector/info_schema_processlist_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import (
2727
)
2828

2929
func TestScrapeProcesslist(t *testing.T) {
30-
_, err := kingpin.CommandLine.Parse([]string{
30+
scraper := ScrapeProcesslist{}
31+
app := kingpin.New("TestScrapeProcesslist", "")
32+
scraper.RegisterFlags(app)
33+
34+
_, err := app.Parse([]string{
3135
"--collect.info_schema.processlist.processes_by_user",
3236
"--collect.info_schema.processlist.processes_by_host",
3337
})
@@ -57,7 +61,7 @@ func TestScrapeProcesslist(t *testing.T) {
5761

5862
ch := make(chan prometheus.Metric)
5963
go func() {
60-
if err = (ScrapeProcesslist{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
64+
if err = scraper.Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
6165
t.Errorf("error calling function on test: %s", err)
6266
}
6367
close(ch)

Diff for: collector/info_schema_tables.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,6 @@ const (
5050
`
5151
)
5252

53-
// Tunable flags.
54-
var (
55-
tableSchemaDatabases = kingpin.Flag(
56-
"collect.info_schema.tables.databases",
57-
"The list of databases to collect table stats for, or '*' for all",
58-
).Default("*").String()
59-
)
60-
6153
// Metric descriptors.
6254
var (
6355
infoSchemaTablesVersionDesc = prometheus.NewDesc(
@@ -78,7 +70,9 @@ var (
7870
)
7971

8072
// ScrapeTableSchema collects from `information_schema.tables`.
81-
type ScrapeTableSchema struct{}
73+
type ScrapeTableSchema struct {
74+
Databases string
75+
}
8276

8377
// Name of the Scraper. Should be unique.
8478
func (ScrapeTableSchema) Name() string {
@@ -95,11 +89,19 @@ func (ScrapeTableSchema) Version() float64 {
9589
return 5.1
9690
}
9791

92+
// RegisterFlags adds flags to configure the Scraper.
93+
func (s *ScrapeTableSchema) RegisterFlags(application *kingpin.Application) {
94+
application.Flag(
95+
"collect.info_schema.tables.databases",
96+
"The list of databases to collect table stats for, or '*' for all",
97+
).Default("*").StringVar(&s.Databases)
98+
}
99+
98100
// Scrape collects data from database connection and sends it over channel as prometheus metric.
99-
func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
101+
func (s ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
100102
var dbList []string
101103
db := instance.getDB()
102-
if *tableSchemaDatabases == "*" {
104+
if s.Databases == "*" {
103105
dbListRows, err := db.QueryContext(ctx, dbListQuery)
104106
if err != nil {
105107
return err
@@ -117,7 +119,7 @@ func (ScrapeTableSchema) Scrape(ctx context.Context, instance *instance, ch chan
117119
dbList = append(dbList, database)
118120
}
119121
} else {
120-
dbList = strings.Split(*tableSchemaDatabases, ",")
122+
dbList = strings.Split(s.Databases, ",")
121123
}
122124

123125
for _, database := range dbList {

Diff for: collector/mysql_user.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,6 @@ const mysqlUserQuery = `
6969
FROM mysql.user
7070
`
7171

72-
// Tunable flags.
73-
var (
74-
userPrivilegesFlag = kingpin.Flag(
75-
"collect.mysql.user.privileges",
76-
"Enable collecting user privileges from mysql.user",
77-
).Default("false").Bool()
78-
)
79-
8072
var (
8173
labelNames = []string{"mysql_user", "hostmask"}
8274
)
@@ -102,7 +94,9 @@ var (
10294
)
10395

10496
// ScrapeUser collects from `information_schema.processlist`.
105-
type ScrapeUser struct{}
97+
type ScrapeUser struct {
98+
Privileges bool
99+
}
106100

107101
// Name of the Scraper. Should be unique.
108102
func (ScrapeUser) Name() string {
@@ -119,8 +113,16 @@ func (ScrapeUser) Version() float64 {
119113
return 5.1
120114
}
121115

116+
// RegisterFlags adds flags to configure the Scraper.
117+
func (s *ScrapeUser) RegisterFlags(application *kingpin.Application) {
118+
application.Flag(
119+
"collect.mysql.user.privileges",
120+
"Enable collecting user privileges from mysql.user",
121+
).Default("false").BoolVar(&s.Privileges)
122+
}
123+
122124
// Scrape collects data from database connection and sends it over channel as prometheus metric.
123-
func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
125+
func (s ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
124126
db := instance.getDB()
125127
var (
126128
userRows *sql.Rows
@@ -214,7 +216,7 @@ func (ScrapeUser) Scrape(ctx context.Context, instance *instance, ch chan<- prom
214216
return err
215217
}
216218

217-
if *userPrivilegesFlag {
219+
if s.Privileges {
218220
userCols, err := userRows.Columns()
219221
if err != nil {
220222
return err

0 commit comments

Comments
 (0)