Skip to content

Commit 2c7a3dd

Browse files
authored
*: better metrics support (tikv#24)
* support push metrics Signed-off-by: disksing <[email protected]> * cleanup metrics and add grafana template Signed-off-by: disksing <[email protected]> * address comment Signed-off-by: disksing <[email protected]> * minor cleanup Signed-off-by: disksing <[email protected]>
1 parent 99b5488 commit 2c7a3dd

File tree

7 files changed

+4047
-79
lines changed

7 files changed

+4047
-79
lines changed

examples/bench/bench.go

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"bytes"
18+
"context"
19+
"flag"
20+
"fmt"
21+
"log"
22+
"math/rand"
23+
"strings"
24+
"time"
25+
26+
"github.com/tikv/client-go/config"
27+
"github.com/tikv/client-go/metrics"
28+
"github.com/tikv/client-go/rawkv"
29+
"github.com/tikv/client-go/txnkv"
30+
)
31+
32+
var (
33+
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
34+
mode = flag.String("mode", "raw", "raw / txn")
35+
36+
pushAddr = flag.String("push", "127.0.0.1:9090", "pushGateway address")
37+
pushInterval = flag.Duration("interval", 15*time.Second, "push metrics interval")
38+
pushJob = flag.String("job", "bench", "job name")
39+
pushInstance = flag.String("instance", "bench1", "instance name")
40+
41+
keyLen = flag.Int("klen", 10, "length of key")
42+
valueLen = flag.Int("vlen", 20, "length of value")
43+
keyRange = flag.Int("range", 100000, "size of the key set")
44+
45+
rawGetP = flag.Int("raw-get-p", 1, "raw get concurrency")
46+
rawBatchGetP = flag.Int("raw-batch-get-p", 0, "raw batch get concurrency")
47+
rawBatchGetN = flag.Int("raw-batch-get-n", 10, "raw batch get batch size")
48+
rawPutP = flag.Int("raw-put-p", 1, "raw put concurrency")
49+
rawBatchPutP = flag.Int("raw-batch-put-p", 0, "raw batch put concurrency")
50+
rawBatchPutN = flag.Int("raw-batch-put-n", 10, "raw batch put batch size")
51+
rawDeleteP = flag.Int("raw-delete-p", 1, "raw delete concurrency")
52+
rawBatchDeleteP = flag.Int("raw-batch-delete-p", 0, "raw batch delete concurrency")
53+
rawBatchDeleteN = flag.Int("raw-batch-delete-n", 10, "raw batch delete batch size")
54+
rawScanP = flag.Int("raw-scan-p", 1, "raw scan concurrency")
55+
rawScanL = flag.Int("raw-scan-l", 10, "raw scan limit")
56+
57+
txn1P = flag.Int("txn1-p", 1, "txn1 concurrency")
58+
txn1GetN = flag.Int("txn1-get-n", 10, "txn1 get command count")
59+
txn1PutN = flag.Int("txn1-put-n", 0, "txn1 put command count")
60+
txn1DeleteN = flag.Int("txn1-delete-n", 0, "txn1 delete command count")
61+
txn1ScanN = flag.Int("txn1-scan-n", 1, "txn1 scan command count")
62+
txn1ScanL = flag.Int("txn1-scan-l", 10, "txn1 scan limit")
63+
64+
txn2P = flag.Int("txn2-p", 2, "txn2 concurrency")
65+
txn2GetN = flag.Int("txn2-get-n", 0, "txn2 get command count")
66+
txn2PutN = flag.Int("txn2-put-n", 10, "txn2 put command count")
67+
txn2DeleteN = flag.Int("txn2-delete-n", 1, "txn2 delete command count")
68+
txn2ScanN = flag.Int("txn2-scan-n", 0, "txn2 scan command count")
69+
txn2ScanL = flag.Int("txn2-scan-l", 10, "txn2 scan limit")
70+
71+
txn3P = flag.Int("txn3-p", 0, "txn3 concurrency")
72+
txn3GetN = flag.Int("txn3-get-n", 1, "txn3 get command count")
73+
txn3PutN = flag.Int("txn3-put-n", 1, "txn3 put command count")
74+
txn3DeleteN = flag.Int("txn3-delete-n", 1, "txn3 delete command count")
75+
txn3ScanN = flag.Int("txn3-scan-n", 1, "txn3 scan command count")
76+
txn3ScanL = flag.Int("txn3-scan-l", 10, "txn3 scan limit")
77+
)
78+
79+
func newConfig() config.Config {
80+
return config.Default()
81+
}
82+
83+
var (
84+
rawCli *rawkv.Client
85+
txnCli *txnkv.Client
86+
)
87+
88+
func k() []byte {
89+
var t string
90+
if *mode == "raw" {
91+
t = fmt.Sprintf("R%%%dd", *keyLen-1)
92+
} else {
93+
t = fmt.Sprintf("T%%%dd", *keyLen-1)
94+
}
95+
return []byte(fmt.Sprintf(t, rand.Intn(*keyRange)))
96+
}
97+
98+
func v() []byte {
99+
return bytes.Repeat([]byte{0}, *valueLen)
100+
}
101+
102+
func n(x int, f func() []byte) [][]byte {
103+
res := make([][]byte, x)
104+
for i := range res {
105+
res[i] = f()
106+
}
107+
return res
108+
}
109+
110+
func nk(x int) [][]byte { return n(x, k) }
111+
func nv(x int) [][]byte { return n(x, v) }
112+
113+
func P(p int, f func()) {
114+
for i := 0; i < p; i++ {
115+
go func() {
116+
for {
117+
f()
118+
}
119+
}()
120+
}
121+
}
122+
123+
func benchRaw() {
124+
var err error
125+
rawCli, err = rawkv.NewClient(context.TODO(), strings.Split(*pdAddr, ","), newConfig())
126+
if err != nil {
127+
log.Fatal(err)
128+
}
129+
130+
P(*rawGetP, func() { rawCli.Get(context.TODO(), k()) })
131+
P(*rawBatchGetP, func() { rawCli.BatchGet(context.TODO(), nk(*rawBatchGetN)) })
132+
P(*rawPutP, func() { rawCli.Put(context.TODO(), k(), v()) })
133+
P(*rawBatchPutP, func() { rawCli.BatchPut(context.TODO(), nk(*rawBatchPutN), nv(*rawBatchPutN)) })
134+
P(*rawDeleteP, func() { rawCli.Delete(context.TODO(), k()) })
135+
P(*rawBatchDeleteP, func() { rawCli.BatchDelete(context.TODO(), nk(*rawBatchDeleteN)) })
136+
P(*rawScanP, func() { rawCli.Scan(context.TODO(), k(), nil, *rawScanL) })
137+
}
138+
139+
func benchTxn() {
140+
var err error
141+
txnCli, err = txnkv.NewClient(context.TODO(), strings.Split(*pdAddr, ","), newConfig())
142+
if err != nil {
143+
log.Fatal(err)
144+
}
145+
146+
t := func(getN, putN, delN, scanN, scanL int) func() {
147+
return func() {
148+
tx, err := txnCli.Begin(context.TODO())
149+
if err != nil {
150+
return
151+
}
152+
for i := 0; i < getN; i++ {
153+
tx.Get(context.TODO(), k())
154+
}
155+
for i := 0; i < putN; i++ {
156+
tx.Set(k(), v())
157+
}
158+
for i := 0; i < delN; i++ {
159+
tx.Delete(k())
160+
}
161+
for i := 0; i < scanN; i++ {
162+
it, err := tx.Iter(context.TODO(), k(), nil)
163+
if err != nil {
164+
continue
165+
}
166+
for j := 0; j < scanL && it.Valid(); j++ {
167+
it.Next(context.TODO())
168+
}
169+
it.Close()
170+
}
171+
tx.Commit(context.TODO())
172+
}
173+
}
174+
175+
P(*txn1P, t(*txn1GetN, *txn1PutN, *txn1DeleteN, *txn1ScanN, *txn1ScanL))
176+
P(*txn2P, t(*txn2GetN, *txn2PutN, *txn2DeleteN, *txn2ScanN, *txn2ScanL))
177+
P(*txn3P, t(*txn3GetN, *txn3PutN, *txn3DeleteN, *txn3ScanN, *txn3ScanL))
178+
}
179+
180+
func main() {
181+
flag.Parse()
182+
183+
go metrics.PushMetrics(context.TODO(), *pushAddr, *pushInterval, *pushJob, *pushInstance)
184+
185+
switch *mode {
186+
case "raw":
187+
benchRaw()
188+
case "txn":
189+
benchTxn()
190+
default:
191+
log.Fatal("invalid mode:", *mode)
192+
}
193+
194+
for {
195+
fmt.Print(".")
196+
time.Sleep(time.Second)
197+
}
198+
}

metrics/metrics.go

+11-38
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package metrics
1616
import "github.com/prometheus/client_golang/prometheus"
1717

1818
// Client metrics.
19-
// TODO: Create new grafana page for the metrics.
2019
var (
2120
TxnCounter = prometheus.NewCounter(
2221
prometheus.CounterOpts{
@@ -26,6 +25,16 @@ var (
2625
Help: "Counter of created txns.",
2726
})
2827

28+
TxnHistogram = prometheus.NewHistogram(
29+
prometheus.HistogramOpts{
30+
Namespace: "tikv",
31+
Subsystem: "client_go",
32+
Name: "txn_durations_seconds",
33+
Help: "Bucketed histogram of processing txn",
34+
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
35+
},
36+
)
37+
2938
SnapshotCounter = prometheus.NewCounter(
3039
prometheus.CounterOpts{
3140
Namespace: "tikv",
@@ -34,14 +43,6 @@ var (
3443
Help: "Counter of snapshots.",
3544
})
3645

37-
TxnCmdCounter = prometheus.NewCounterVec(
38-
prometheus.CounterOpts{
39-
Namespace: "tikv",
40-
Subsystem: "client_go",
41-
Name: "txn_cmd_total",
42-
Help: "Counter of txn commands.",
43-
}, []string{"type"})
44-
4546
TxnCmdHistogram = prometheus.NewHistogramVec(
4647
prometheus.HistogramOpts{
4748
Namespace: "tikv",
@@ -68,15 +69,6 @@ var (
6869
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
6970
})
7071

71-
ConnPoolHistogram = prometheus.NewHistogramVec(
72-
prometheus.HistogramOpts{
73-
Namespace: "tikv",
74-
Subsystem: "client_go",
75-
Name: "get_conn_seconds",
76-
Help: "Bucketed histogram of taking conn from conn pool.",
77-
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
78-
}, []string{"type"})
79-
8072
SendReqHistogram = prometheus.NewHistogramVec(
8173
prometheus.HistogramOpts{
8274
Namespace: "tikv",
@@ -86,23 +78,6 @@ var (
8678
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
8779
}, []string{"type", "store"})
8880

89-
CoprocessorCounter = prometheus.NewCounterVec(
90-
prometheus.CounterOpts{
91-
Namespace: "tikv",
92-
Subsystem: "client_go",
93-
Name: "cop_actions_total",
94-
Help: "Counter of coprocessor actions.",
95-
}, []string{"type"})
96-
97-
CoprocessorHistogram = prometheus.NewHistogram(
98-
prometheus.HistogramOpts{
99-
Namespace: "tikv",
100-
Subsystem: "client_go",
101-
Name: "cop_duration_seconds",
102-
Help: "Run duration of a single coprocessor task, includes backoff time.",
103-
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
104-
})
105-
10681
LockResolverCounter = prometheus.NewCounterVec(
10782
prometheus.CounterOpts{
10883
Namespace: "tikv",
@@ -238,13 +213,11 @@ func RetLabel(err error) string {
238213
func init() {
239214
prometheus.MustRegister(TxnCounter)
240215
prometheus.MustRegister(SnapshotCounter)
216+
prometheus.MustRegister(TxnHistogram)
241217
prometheus.MustRegister(TxnCmdHistogram)
242218
prometheus.MustRegister(BackoffCounter)
243219
prometheus.MustRegister(BackoffHistogram)
244220
prometheus.MustRegister(SendReqHistogram)
245-
prometheus.MustRegister(ConnPoolHistogram)
246-
prometheus.MustRegister(CoprocessorCounter)
247-
prometheus.MustRegister(CoprocessorHistogram)
248221
prometheus.MustRegister(LockResolverCounter)
249222
prometheus.MustRegister(RegionErrorCounter)
250223
prometheus.MustRegister(TxnWriteKVCountHistogram)

metrics/push.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package metrics
15+
16+
import (
17+
"context"
18+
"time"
19+
20+
"github.com/prometheus/client_golang/prometheus"
21+
"github.com/prometheus/client_golang/prometheus/push"
22+
log "github.com/sirupsen/logrus"
23+
)
24+
25+
// PushMetrics pushes metrics to Prometheus Pushgateway.
26+
// Note:
27+
// * Normally, you need to start a goroutine to push metrics: `go
28+
// PushMetrics(...)`
29+
// * `instance` should be global identical -- NO 2 processes share a same
30+
// `instance`.
31+
// * `job` is used to distinguish different workloads, DO NOT use too many `job`
32+
// labels since there are grafana panels that groups by `job`.
33+
func PushMetrics(ctx context.Context, addr string, interval time.Duration, job, instance string) {
34+
ticker := time.NewTicker(interval)
35+
defer ticker.Stop()
36+
37+
for {
38+
select {
39+
case <-ctx.Done():
40+
return
41+
case <-ticker.C:
42+
}
43+
44+
err := push.AddFromGatherer(
45+
job,
46+
map[string]string{"instance": instance},
47+
addr,
48+
prometheus.DefaultGatherer,
49+
)
50+
if err != nil {
51+
log.Errorf("cannot push metrics to prometheus pushgateway: %v", err)
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)