diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md
index 61cf6be1cf8..db2494d3794 100644
--- a/changelog/18.0/18.0.0/summary.md
+++ b/changelog/18.0/18.0.0/summary.md
@@ -16,8 +16,10 @@
- [Deleted `V3` planner](#deleted-v3)
- [Deleted `k8stopo`](#deleted-k8stopo)
- [Deleted `vtgr`](#deleted-vtgr)
+ - [Deprecated VTBackup stat `DurationByPhase`](#deprecated-vtbackup-stat-duration-by-phase)
- **[New stats](#new-stats)**
- [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters)
+ - [VTBackup stat `Phase`](#vtbackup-stat-phase)
- [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status)
- **[VTTablet](#vttablet)**
- [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences)
@@ -115,17 +117,37 @@ the `k8stopo` has been removed.
The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessio/vitess/issues/13300. With Vitess 18 `vtgr` has been removed.
+#### Deprecated VTbackup stat `DurationByPhase`
+
+VTBackup stat `DurationByPhase` is deprecated. Use the binary-valued `Phase` stat instead.
+
### New stats
#### VTGate Vindex unknown parameters
The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found in the latest VSchema pulled from the topology.
+#### VTBackup `Phase` stat
+
+In v17, the `vtbackup` stat `DurationByPhase` stat was added measuring the time spent by `vtbackup` in each phase. This stat turned out to be awkward to use in production, and has been replaced in v18 by a binary-valued `Phase` stat.
+
+`Phase` reports a 1 (active) or a 0 (inactive) for each of the following phases:
+
+ * `CatchupReplication`
+ * `InitialBackup`
+ * `RestoreLastBackup`
+ * `TakeNewBackup`
+
+To calculate how long `vtbackup` has spent in a given phase, sum the 1-valued data points over time and multiply by the data collection or reporting interval. For example, in Prometheus:
+
+```
+sum_over_time(vtbackup_phase{phase="TakeNewBackup"}) *
+```
#### VTBackup `PhaseStatus` stat
`PhaseStatus` reports a 1 (active) or a 0 (inactive) for each of the following phases and statuses:
- * `CatchUpReplication` phase has statuses `Stalled` and `Stopped`.
+ * `CatchupReplication` phase has statuses `Stalled` and `Stopped`.
* `Stalled` is set to `1` when replication stops advancing.
* `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary.
@@ -165,4 +187,4 @@ removing Vitess support.
#### New Durability Policies
-2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
\ No newline at end of file
+2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs.
diff --git a/go/cmd/vtbackup/plugin_opentsdb.go b/go/cmd/vtbackup/plugin_opentsdb.go
new file mode 100644
index 00000000000..44ac886d420
--- /dev/null
+++ b/go/cmd/vtbackup/plugin_opentsdb.go
@@ -0,0 +1,25 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import "vitess.io/vitess/go/stats/opentsdb"
+
+// This plugin imports opentsdb to register the opentsdb stats backend.
+
+func init() {
+ opentsdb.Init("vtbackup")
+}
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go
index f27a991d35e..ebf83526cad 100644
--- a/go/cmd/vtbackup/vtbackup.go
+++ b/go/cmd/vtbackup/vtbackup.go
@@ -100,9 +100,12 @@ const (
// forever for things that should be quick.
operationTimeout = 1 * time.Minute
- phaseNameCatchUpReplication = "CatchUpReplication"
- phaseStatusCatchUpReplicationStalled = "Stalled"
- phaseStatusCatchUpReplicationStopped = "Stopped"
+ phaseNameCatchupReplication = "CatchupReplication"
+ phaseNameInitialBackup = "InitialBackup"
+ phaseNameRestoreLastBackup = "RestoreLastBackup"
+ phaseNameTakeNewBackup = "TakeNewBackup"
+ phaseStatusCatchupReplicationStalled = "Stalled"
+ phaseStatusCatchupReplicationStopped = "Stopped"
)
var (
@@ -127,20 +130,44 @@ var (
detachedMode bool
keepAliveTimeout = 0 * time.Second
disableRedoLog = false
- durationByPhase = stats.NewGaugesWithSingleLabel(
+
+ // Deprecated, use "Phase" instead.
+ deprecatedDurationByPhase = stats.NewGaugesWithSingleLabel(
"DurationByPhaseSeconds",
- "How long it took vtbackup to perform each phase (in seconds).",
+ "[DEPRECATED] How long it took vtbackup to perform each phase (in seconds).",
+ "phase",
+ )
+
+ // This gauge is updated 3*N times during the course of a vtbackup run,
+ // where N is the number of different phases vtbackup transitions through.
+ // Once to initialize to 0, another time to set the phase to active (1),
+ // and another to deactivate the phase (back to 0).
+ //
+ // At most a single phase is active at a given time.
+ //
+ // The sync gauge immediately reports changes to push-backed backends.
+ // The benefit of the sync gauge is that it makes verifying stats in
+ // integration tests a lot more tractable.
+ phase = stats.NewSyncGaugesWithSingleLabel(
+ "Phase",
+ "Active phase.",
"phase",
)
+ phaseNames = []string{
+ phaseNameCatchupReplication,
+ phaseNameInitialBackup,
+ phaseNameRestoreLastBackup,
+ phaseNameTakeNewBackup,
+ }
phaseStatus = stats.NewGaugesWithMultiLabels(
"PhaseStatus",
"Internal state of vtbackup phase.",
[]string{"phase", "status"},
)
phaseStatuses = map[string][]string{
- phaseNameCatchUpReplication: {
- phaseStatusCatchUpReplicationStalled,
- phaseStatusCatchUpReplicationStopped,
+ phaseNameCatchupReplication: {
+ phaseStatusCatchupReplicationStalled,
+ phaseStatusCatchupReplicationStopped,
},
}
)
@@ -219,6 +246,9 @@ func main() {
defer topoServer.Close()
// Initialize stats.
+ for _, phaseName := range phaseNames {
+ phase.Set(phaseName, int64(0))
+ }
for phaseName, statuses := range phaseStatuses {
for _, status := range statuses {
phaseStatus.Set([]string{phaseName, status}, 0)
@@ -294,7 +324,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err := mysqld.Init(initCtx, mycnf, initDBSQLFile); err != nil {
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err)
}
- durationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds()))
+ deprecatedDurationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds()))
// Shut down mysqld when we're done.
defer func() {
// Be careful not to use the original context, because we don't want to
@@ -358,14 +388,19 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
backupParams.BackupTime = time.Now()
// Now we're ready to take the backup.
+ phase.Set(phaseNameInitialBackup, int64(1))
+ defer phase.Set(phaseNameInitialBackup, int64(0))
if err := mysqlctl.Backup(ctx, backupParams); err != nil {
return fmt.Errorf("backup failed: %v", err)
}
- durationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds()))
+ deprecatedDurationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds()))
log.Info("Initial backup successful.")
+ phase.Set(phaseNameInitialBackup, int64(0))
return nil
}
+ phase.Set(phaseNameRestoreLastBackup, int64(1))
+ defer phase.Set(phaseNameRestoreLastBackup, int64(0))
backupDir := mysqlctl.GetBackupDir(initKeyspace, initShard)
log.Infof("Restoring latest backup from directory %v", backupDir)
restoreAt := time.Now()
@@ -397,7 +432,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
default:
return fmt.Errorf("can't restore from backup: %v", err)
}
- durationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds()))
+ deprecatedDurationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds()))
+ phase.Set(phaseNameRestoreLastBackup, int64(0))
// As of MySQL 8.0.21, you can disable redo logging using the ALTER INSTANCE
// DISABLE INNODB REDO_LOG statement. This functionality is intended for
@@ -455,6 +491,9 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
backupParams.BackupTime = time.Now()
// Wait for replication to catch up.
+ phase.Set(phaseNameCatchupReplication, int64(1))
+ defer phase.Set(phaseNameCatchupReplication, int64(0))
+
var (
lastStatus replication.ReplicationStatus
status replication.ReplicationStatus
@@ -479,26 +518,27 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
// We're caught up on replication to at least the point the primary
// was at when this vtbackup run started.
log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime))
- durationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds()))
+ deprecatedDurationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds()))
break
}
if !lastStatus.Position.IsZero() {
if status.Position.Equal(lastStatus.Position) {
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 1)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 1)
} else {
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0)
}
}
if !status.Healthy() {
log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.")
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 1)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 1)
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
}
} else {
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0)
}
}
+ phase.Set(phaseNameCatchupReplication, int64(0))
// Stop replication and see where we are.
if err := mysqld.StopReplication(nil); err != nil {
@@ -514,8 +554,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if !status.Position.AtLeast(primaryPos) && status.Position.Equal(restorePos) {
return fmt.Errorf("not taking backup: replication did not make any progress from restore point: %v", restorePos)
}
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0)
- phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0)
+ phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0)
// Re-enable redo logging.
if disabledRedoLog {
@@ -539,15 +579,18 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err := mysqld.Start(ctx, mycnf); err != nil {
return fmt.Errorf("Could not start MySQL after full shutdown: %v", err)
}
- durationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds()))
+ deprecatedDurationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds()))
}
// Now we can take a new backup.
backupAt := time.Now()
+ phase.Set(phaseNameTakeNewBackup, int64(1))
+ defer phase.Set(phaseNameTakeNewBackup, int64(0))
if err := mysqlctl.Backup(ctx, backupParams); err != nil {
return fmt.Errorf("error taking backup: %v", err)
}
- durationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds()))
+ deprecatedDurationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds()))
+ phase.Set(phaseNameTakeNewBackup, int64(0))
// Return a non-zero exit code if we didn't meet the replication position
// goal, even though we took a backup that pushes the high-water mark up.
diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt
index 44cb2a08462..46e4efea301 100644
--- a/go/flags/endtoend/vtbackup.txt
+++ b/go/flags/endtoend/vtbackup.txt
@@ -134,6 +134,7 @@ Usage of vtbackup:
--mysql_server_version string MySQL server version to advertise. (default "8.0.30-Vitess")
--mysql_socket string path to the mysql socket
--mysql_timeout duration how long to wait for mysqld startup (default 5m0s)
+ --opentsdb_uri string URI of opentsdb /api/put method
--port int port for the server
--pprof strings enable profiling
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
diff --git a/go/stats/counters.go b/go/stats/counters.go
index 371cbd53818..a4dfc0dcb1f 100644
--- a/go/stats/counters.go
+++ b/go/stats/counters.go
@@ -321,6 +321,29 @@ func (g *GaugesWithSingleLabel) Set(name string, value int64) {
g.counters.set(name, value)
}
+// SyncGaugesWithSingleLabel is a GaugesWithSingleLabel that proactively pushes
+// stats to push-based backends when Set is called.
+type SyncGaugesWithSingleLabel struct {
+ GaugesWithSingleLabel
+ name string
+}
+
+// NewSyncGaugesWithSingleLabel creates a new SyncGaugesWithSingleLabel.
+func NewSyncGaugesWithSingleLabel(name, help, label string, tags ...string) *SyncGaugesWithSingleLabel {
+ return &SyncGaugesWithSingleLabel{
+ GaugesWithSingleLabel: *NewGaugesWithSingleLabel(name, help, label, tags...),
+ name: name,
+ }
+}
+
+// Set sets the value of a named gauge.
+func (sg *SyncGaugesWithSingleLabel) Set(name string, value int64) {
+ sg.GaugesWithSingleLabel.Set(name, value)
+ if sg.name != "" {
+ _ = pushOne(sg.name, &sg.GaugesWithSingleLabel)
+ }
+}
+
// GaugesWithMultiLabels is a CountersWithMultiLabels implementation where
// the values can go up and down.
type GaugesWithMultiLabels struct {
diff --git a/go/stats/export.go b/go/stats/export.go
index e98ef0a969c..8bda85c87b2 100644
--- a/go/stats/export.go
+++ b/go/stats/export.go
@@ -121,6 +121,22 @@ func Publish(name string, v expvar.Var) {
publish(name, v)
}
+func pushAll() error {
+ backend, ok := pushBackends[statsBackend]
+ if !ok {
+ return fmt.Errorf("no PushBackend registered with name %s", statsBackend)
+ }
+ return backend.PushAll()
+}
+
+func pushOne(name string, v Variable) error {
+ backend, ok := pushBackends[statsBackend]
+ if !ok {
+ return fmt.Errorf("no PushBackend registered with name %s", statsBackend)
+ }
+ return backend.PushOne(name, v)
+}
+
// StringMapFuncWithMultiLabels is a multidimensional string map publisher.
//
// Map keys are compound names made with joining multiple strings with '.',
@@ -183,8 +199,10 @@ func publish(name string, v expvar.Var) {
// to be pushed to it. It's used to support push-based metrics backends, as expvar
// by default only supports pull-based ones.
type PushBackend interface {
- // PushAll pushes all stats from expvar to the backend
+ // PushAll pushes all stats from expvar to the backend.
PushAll() error
+ // PushOne pushes a single stat from expvar to the backend.
+ PushOne(name string, v Variable) error
}
var pushBackends = make(map[string]PushBackend)
@@ -214,13 +232,7 @@ func emitToBackend(emitPeriod *time.Duration) {
ticker := time.NewTicker(*emitPeriod)
defer ticker.Stop()
for range ticker.C {
- backend, ok := pushBackends[statsBackend]
- if !ok {
- log.Errorf("No PushBackend registered with name %s", statsBackend)
- return
- }
- err := backend.PushAll()
- if err != nil {
+ if err := pushAll(); err != nil {
// TODO(aaijazi): This might cause log spam...
log.Warningf("Pushing stats to backend %v failed: %v", statsBackend, err)
}
diff --git a/go/stats/opentsdb/backend.go b/go/stats/opentsdb/backend.go
new file mode 100644
index 00000000000..e5c766ba797
--- /dev/null
+++ b/go/stats/opentsdb/backend.go
@@ -0,0 +1,58 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "time"
+
+ "vitess.io/vitess/go/stats"
+)
+
+// backend implements stats.PushBackend
+type backend struct {
+ // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be
+ // prepended to all the stats reported.
+ prefix string
+ // Tags that should be included with every data point. If there's a tag name
+ // collision between the common tags and a single data point's tags, the data
+ // point tag will override the common tag.
+ commonTags map[string]string
+ // writer is used to send data points somewhere (file, http, ...).
+ writer writer
+}
+
+// PushAll pushes all stats to OpenTSDB
+func (b *backend) PushAll() error {
+ collector := b.collector()
+ collector.collectAll()
+ return b.writer.Write(collector.data)
+}
+
+// PushOne pushes a single stat to OpenTSDB
+func (b *backend) PushOne(name string, v stats.Variable) error {
+ collector := b.collector()
+ collector.collectOne(name, v)
+ return b.writer.Write(collector.data)
+}
+
+func (b *backend) collector() *collector {
+ return &collector{
+ commonTags: b.commonTags,
+ prefix: b.prefix,
+ timestamp: time.Now().Unix(),
+ }
+}
diff --git a/go/stats/opentsdb/by_metric.go b/go/stats/opentsdb/by_metric.go
new file mode 100644
index 00000000000..5177109a18e
--- /dev/null
+++ b/go/stats/opentsdb/by_metric.go
@@ -0,0 +1,54 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+// byMetric implements sort.Interface for []*DataPoint based on the metric key
+// and then tag values (prioritized in tag name order). Having a consistent sort order
+// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly
+// in the tests.
+type byMetric []*DataPoint
+
+func (m byMetric) Len() int { return len(m) }
+func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
+func (m byMetric) Less(i, j int) bool {
+ if m[i].Metric < m[j].Metric {
+ return true
+ }
+
+ if m[i].Metric > m[j].Metric {
+ return false
+ }
+
+ // Metric names are the same. We can use tag values to figure out the sort order.
+ // The deciding tag will be the lexicographically earliest tag name where tag values differ.
+ decidingTagName := ""
+ result := false
+ for tagName, iVal := range m[i].Tags {
+ jVal, ok := m[j].Tags[tagName]
+ if !ok {
+ // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier.
+ // This shouldn't happen in practice, though, if metric code is correct...
+ return true
+ }
+
+ if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") {
+ decidingTagName = tagName
+ result = iVal < jVal
+ }
+ }
+ return result
+}
diff --git a/go/stats/opentsdb/opentsdb.go b/go/stats/opentsdb/collector.go
similarity index 54%
rename from go/stats/opentsdb/opentsdb.go
rename to go/stats/opentsdb/collector.go
index 3e85052b5f4..9b870815067 100644
--- a/go/stats/opentsdb/opentsdb.go
+++ b/go/stats/opentsdb/collector.go
@@ -14,151 +14,47 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-// Package opentsdb adds support for pushing stats to opentsdb.
package opentsdb
import (
"bytes"
"encoding/json"
"expvar"
- "net/http"
- "sort"
"strings"
- "time"
"unicode"
- "github.com/spf13/pflag"
-
"vitess.io/vitess/go/stats"
- "vitess.io/vitess/go/vt/servenv"
)
-var openTsdbURI string
-
-func registerFlags(fs *pflag.FlagSet) {
- fs.StringVar(&openTsdbURI, "opentsdb_uri", openTsdbURI, "URI of opentsdb /api/put method")
-}
-
-func init() {
- servenv.OnParseFor("vtctld", registerFlags)
- servenv.OnParseFor("vtgate", registerFlags)
- servenv.OnParseFor("vttablet", registerFlags)
-}
-
-// dataPoint represents a single OpenTSDB data point.
-type dataPoint struct {
- // Example: sys.cpu.nice
- Metric string `json:"metric"`
- // Seconds or milliseconds since unix epoch.
- Timestamp float64 `json:"timestamp"`
- Value float64 `json:"value"`
- Tags map[string]string `json:"tags"`
-}
-
-// sendDataPoints pushes a list of data points to openTSDB.
-// All other code in this file is just to support getting this function called
-// with all stats represented as data points.
-func sendDataPoints(data []dataPoint) error {
- json, err := json.Marshal(data)
- if err != nil {
- return err
- }
-
- resp, err := http.Post(openTsdbURI, "application/json", bytes.NewReader(json))
- if err != nil {
- return err
- }
- resp.Body.Close()
- return nil
-}
-
-// openTSDBBackend implements stats.PushBackend
-type openTSDBBackend struct {
- // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be
- // prepended to all the stats reported.
- prefix string
- // Tags that should be included with every data point. If there's a tag name
- // collision between the common tags and a single data point's tags, the data
- // point tag will override the common tag.
+// collector tracks state for a single pass of stats reporting / data collection.
+type collector struct {
commonTags map[string]string
-}
-
-// dataCollector tracks state for a single pass of stats reporting / data collection.
-type dataCollector struct {
- settings *openTSDBBackend
+ data []*DataPoint
+ prefix string
timestamp int64
- dataPoints []dataPoint
-}
-
-// Init attempts to create a singleton openTSDBBackend and register it as a PushBackend.
-// If it fails to create one, this is a noop. The prefix argument is an optional string
-// to prepend to the name of every data point reported.
-func Init(prefix string) {
- // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging
- servenv.OnRun(func() {
- InitWithoutServenv(prefix)
- })
-}
-
-// InitWithoutServenv initializes the opentsdb without servenv
-func InitWithoutServenv(prefix string) {
- if openTsdbURI == "" {
- return
- }
-
- backend := &openTSDBBackend{
- prefix: prefix,
- commonTags: stats.ParseCommonTags(stats.CommonTags),
- }
-
- stats.RegisterPushBackend("opentsdb", backend)
-
- servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "application/json; charset=utf-8")
- dataPoints := (*backend).getDataPoints()
- sort.Sort(byMetric(dataPoints))
-
- if b, err := json.MarshalIndent(dataPoints, "", " "); err != nil {
- w.Write([]byte(err.Error()))
- } else {
- w.Write(b)
- }
- })
}
-// PushAll pushes all stats to OpenTSDB
-func (backend *openTSDBBackend) PushAll() error {
- return sendDataPoints(backend.getDataPoints())
-}
-
-// getDataPoints fetches all stats in an opentsdb-compatible format.
-// This is separated from PushAll() so it can be reused for the /debug/opentsdb handler.
-func (backend *openTSDBBackend) getDataPoints() []dataPoint {
- dataCollector := &dataCollector{
- settings: backend,
- timestamp: time.Now().Unix(),
- }
-
+func (dc *collector) collectAll() {
expvar.Do(func(kv expvar.KeyValue) {
- dataCollector.addExpVar(kv)
+ dc.addExpVar(kv)
})
-
- return dataCollector.dataPoints
}
-// combineMetricName joins parts of a hierarchical name with a "."
-func combineMetricName(parts ...string) string {
- return strings.Join(parts, ".")
+func (dc *collector) collectOne(name string, v expvar.Var) {
+ dc.addExpVar(expvar.KeyValue{
+ Key: name,
+ Value: v,
+ })
}
-func (dc *dataCollector) addInt(metric string, val int64, tags map[string]string) {
+func (dc *collector) addInt(metric string, val int64, tags map[string]string) {
dc.addFloat(metric, float64(val), tags)
}
-func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]string) {
+func (dc *collector) addFloat(metric string, val float64, tags map[string]string) {
var fullMetric string
- if len(dc.settings.prefix) > 0 {
- fullMetric = combineMetricName(dc.settings.prefix, metric)
+ if len(dc.prefix) > 0 {
+ fullMetric = combineMetricName(dc.prefix, metric)
} else {
fullMetric = metric
}
@@ -182,20 +78,20 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st
}
fullTags := make(map[string]string)
- for k, v := range dc.settings.commonTags {
+ for k, v := range dc.commonTags {
fullTags[sanitize(k)] = sanitize(v)
}
for k, v := range tags {
fullTags[sanitize(k)] = sanitize(v)
}
- dp := dataPoint{
+ dp := &DataPoint{
Metric: sanitize(fullMetric),
Value: val,
Timestamp: float64(dc.timestamp),
Tags: fullTags,
}
- dc.dataPoints = append(dc.dataPoints, dp)
+ dc.data = append(dc.data, dp)
}
// addExpVar adds all the data points associated with a particular expvar to the list of
@@ -206,7 +102,7 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st
//
// Generic unrecognized expvars are serialized to json and their int/float values are exported.
// Strings and lists in expvars are not exported.
-func (dc *dataCollector) addExpVar(kv expvar.KeyValue) {
+func (dc *collector) addExpVar(kv expvar.KeyValue) {
k := kv.Key
switch v := kv.Value.(type) {
case stats.FloatFunc:
@@ -268,24 +164,8 @@ func (dc *dataCollector) addExpVar(kv expvar.KeyValue) {
}
}
-// makeLabel builds a tag list with a single label + value.
-func makeLabel(labelName string, labelVal string) map[string]string {
- return map[string]string{labelName: labelVal}
-}
-
-// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it
-// apart into a map of label name -> label value.
-func makeLabels(labelNames []string, labelValsCombined string) map[string]string {
- tags := make(map[string]string)
- labelVals := strings.Split(labelValsCombined, ".")
- for i, v := range labelVals {
- tags[labelNames[i]] = v
- }
- return tags
-}
-
// addUnrecognizedExpvars recurses into a json object to pull out float64 variables to report.
-func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]any) {
+func (dc *collector) addUnrecognizedExpvars(prefix string, obj map[string]any) {
for k, v := range obj {
prefix := combineMetricName(prefix, k)
switch v := v.(type) {
@@ -298,7 +178,7 @@ func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]an
}
// addTimings converts a vitess Timings stat to something opentsdb can deal with.
-func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, prefix string) {
+func (dc *collector) addTimings(labels []string, timings *stats.Timings, prefix string) {
histograms := timings.Histograms()
for labelValsCombined, histogram := range histograms {
// If you prefer millisecond timings over nanoseconds you can pass 1000000 here instead of 1.
@@ -306,7 +186,7 @@ func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, pre
}
}
-func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) {
+func (dc *collector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) {
// TODO: OpenTSDB 2.3 doesn't have histogram support, although it's forthcoming.
// For simplicity we report each bucket as a different metric.
//
@@ -335,39 +215,23 @@ func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64
)
}
-// byMetric implements sort.Interface for []dataPoint based on the metric key
-// and then tag values (prioritized in tag name order). Having a consistent sort order
-// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly
-// in the tests.
-type byMetric []dataPoint
-
-func (m byMetric) Len() int { return len(m) }
-func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
-func (m byMetric) Less(i, j int) bool {
- if m[i].Metric < m[j].Metric {
- return true
- }
-
- if m[i].Metric > m[j].Metric {
- return false
- }
+// combineMetricName joins parts of a hierarchical name with a "."
+func combineMetricName(parts ...string) string {
+ return strings.Join(parts, ".")
+}
- // Metric names are the same. We can use tag values to figure out the sort order.
- // The deciding tag will be the lexicographically earliest tag name where tag values differ.
- decidingTagName := ""
- result := false
- for tagName, iVal := range m[i].Tags {
- jVal, ok := m[j].Tags[tagName]
- if !ok {
- // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier.
- // This shouldn't happen in practice, though, if metric code is correct...
- return true
- }
+// makeLabel builds a tag list with a single label + value.
+func makeLabel(labelName string, labelVal string) map[string]string {
+ return map[string]string{labelName: labelVal}
+}
- if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") {
- decidingTagName = tagName
- result = iVal < jVal
- }
+// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it
+// apart into a map of label name -> label value.
+func makeLabels(labelNames []string, labelValsCombined string) map[string]string {
+ tags := make(map[string]string)
+ labelVals := strings.Split(labelValsCombined, ".")
+ for i, v := range labelVals {
+ tags[labelNames[i]] = v
}
- return result
+ return tags
}
diff --git a/go/stats/opentsdb/datapoint.go b/go/stats/opentsdb/datapoint.go
new file mode 100644
index 00000000000..42e69b84d47
--- /dev/null
+++ b/go/stats/opentsdb/datapoint.go
@@ -0,0 +1,90 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+// DataPoint represents a single OpenTSDB data point.
+type DataPoint struct {
+ // Example: sys.cpu.nice
+ Metric string `json:"metric"`
+ // Seconds or milliseconds since unix epoch.
+ Timestamp float64 `json:"timestamp"`
+ Value float64 `json:"value"`
+ Tags map[string]string `json:"tags"`
+}
+
+func (dp *DataPoint) MarshalText() (string, error) {
+ var sb strings.Builder
+
+ if _, err := sb.WriteString(fmt.Sprintf("%s %f %f", dp.Metric, dp.Timestamp, dp.Value)); err != nil {
+ return "", err
+ }
+
+ for k, v := range dp.Tags {
+ if _, err := sb.WriteString(fmt.Sprintf(" %s=%s", k, v)); err != nil {
+ return "", err
+ }
+ }
+
+ if _, err := sb.WriteString("\n"); err != nil {
+ return "", err
+ }
+
+ return sb.String(), nil
+}
+
+func unmarshalTextToData(dp *DataPoint, text []byte) error {
+ parts := strings.Split(string(text), " ")
+
+ if len(parts) < 3 {
+ // Technically every OpenTSDB time series requires at least one tag,
+ // but some of the metrics we send have zero.
+ return fmt.Errorf("require format: [ ]")
+ }
+
+ dp.Metric = parts[0]
+
+ timestamp, err := strconv.ParseFloat(parts[1], 64)
+ if err != nil {
+ return err
+ }
+ dp.Timestamp = timestamp
+
+ value, err := strconv.ParseFloat(parts[2], 64)
+ if err != nil {
+ return err
+ }
+ dp.Value = value
+
+ for _, kv := range parts[3:] {
+ tagParts := strings.Split(kv, "=")
+ if len(tagParts) != 2 {
+ return fmt.Errorf("require tag format: ")
+ }
+ if dp.Tags == nil {
+ dp.Tags = make(map[string]string)
+ }
+ dp.Tags[tagParts[0]] = tagParts[1]
+ }
+
+ return nil
+}
diff --git a/go/stats/opentsdb/datapoint_reader.go b/go/stats/opentsdb/datapoint_reader.go
new file mode 100644
index 00000000000..441be9eb7a1
--- /dev/null
+++ b/go/stats/opentsdb/datapoint_reader.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "bufio"
+ "io"
+)
+
+var newLineDelimiter = byte('\n')
+
+// DataPointReader parses bytes from io.Reader into DataPoints.
+type DataPointReader struct {
+ reader *bufio.Reader
+}
+
+func NewDataPointReader(r io.Reader) *DataPointReader {
+ return &DataPointReader{
+ reader: bufio.NewReader(r),
+ }
+}
+
+// Read returns a DataPoint from the underlying io.Reader.
+//
+// Returns an error if no DataPoint could be parsed.
+func (tr *DataPointReader) Read() (*DataPoint, error) {
+ bs, err := tr.reader.ReadBytes(newLineDelimiter)
+ if err != nil {
+ return nil, err
+ }
+
+ dp := &DataPoint{}
+
+ if err := unmarshalTextToData(dp, bs[:len(bs)-1]); err != nil {
+ return nil, err
+ }
+
+ return dp, nil
+}
diff --git a/go/stats/opentsdb/doc.go b/go/stats/opentsdb/doc.go
new file mode 100644
index 00000000000..88c22a58c70
--- /dev/null
+++ b/go/stats/opentsdb/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package opentsdb adds support for pushing stats to opentsdb.
+package opentsdb
diff --git a/go/stats/opentsdb/file_writer.go b/go/stats/opentsdb/file_writer.go
new file mode 100644
index 00000000000..7f2d2f2ccc7
--- /dev/null
+++ b/go/stats/opentsdb/file_writer.go
@@ -0,0 +1,52 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "io"
+ "os"
+)
+
+type fileWriter struct {
+ writer io.WriteCloser
+}
+
+func newFileWriter(path string) (writer, error) {
+ f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0644)
+ if err != nil {
+ return nil, err
+ }
+
+ return &fileWriter{
+ writer: f,
+ }, nil
+}
+
+func (fw *fileWriter) Write(data []*DataPoint) error {
+ for _, d := range data {
+ text, err := d.MarshalText()
+ if err != nil {
+ return err
+ }
+
+ if _, err := fw.writer.Write([]byte(text)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/go/stats/opentsdb/flags.go b/go/stats/opentsdb/flags.go
new file mode 100644
index 00000000000..8ccd0279981
--- /dev/null
+++ b/go/stats/opentsdb/flags.go
@@ -0,0 +1,38 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "github.com/spf13/pflag"
+
+ "vitess.io/vitess/go/vt/servenv"
+)
+
+var (
+ openTSDBURI string
+)
+
+func registerFlags(fs *pflag.FlagSet) {
+ fs.StringVar(&openTSDBURI, "opentsdb_uri", openTSDBURI, "URI of opentsdb /api/put method")
+}
+
+func init() {
+ servenv.OnParseFor("vtbackup", registerFlags)
+ servenv.OnParseFor("vtctld", registerFlags)
+ servenv.OnParseFor("vtgate", registerFlags)
+ servenv.OnParseFor("vttablet", registerFlags)
+}
diff --git a/go/stats/opentsdb/http_writer.go b/go/stats/opentsdb/http_writer.go
new file mode 100644
index 00000000000..7b7801d7f77
--- /dev/null
+++ b/go/stats/opentsdb/http_writer.go
@@ -0,0 +1,51 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+)
+
+type httpWriter struct {
+ client *http.Client
+ uri string
+}
+
+func newHTTPWriter(client *http.Client, uri string) *httpWriter {
+ return &httpWriter{
+ client: client,
+ uri: uri,
+ }
+}
+
+func (hw *httpWriter) Write(data []*DataPoint) error {
+ jsonb, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ resp, err := hw.client.Post(hw.uri, "application/json", bytes.NewReader(jsonb))
+ if err != nil {
+ return err
+ }
+
+ resp.Body.Close()
+
+ return nil
+}
diff --git a/go/stats/opentsdb/init.go b/go/stats/opentsdb/init.go
new file mode 100644
index 00000000000..51186ad7650
--- /dev/null
+++ b/go/stats/opentsdb/init.go
@@ -0,0 +1,104 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+ "sort"
+
+ "vitess.io/vitess/go/stats"
+ "vitess.io/vitess/go/vt/log"
+ "vitess.io/vitess/go/vt/servenv"
+)
+
+var singletonBackend stats.PushBackend
+
+// Init attempts to create a singleton *opentsdb.backend and register it as a PushBackend.
+// If it fails to create one, this is a noop. The prefix argument is an optional string
+// to prepend to the name of every data point reported.
+func Init(prefix string) {
+ // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging
+ servenv.OnRun(func() {
+ log.Info("Initializing opentsdb backend...")
+ backend, err := InitWithoutServenv(prefix)
+ if err != nil {
+ log.Infof("Failed to initialize singleton opentsdb backend: %v", err)
+ } else {
+ singletonBackend = backend
+ log.Info("Initialized opentsdb backend.")
+ }
+ })
+}
+
+// InitWithoutServenv initializes the opentsdb without servenv
+func InitWithoutServenv(prefix string) (stats.PushBackend, error) {
+ b, err := newBackend(prefix)
+
+ if err != nil {
+ return nil, err
+ }
+
+ stats.RegisterPushBackend("opentsdb", b)
+
+ servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ collector := b.collector()
+ collector.collectAll()
+ data := collector.data
+ sort.Sort(byMetric(data))
+
+ if b, err := json.MarshalIndent(data, "", " "); err != nil {
+ w.Write([]byte(err.Error()))
+ } else {
+ w.Write(b)
+ }
+ })
+
+ return b, nil
+}
+
+func newBackend(prefix string) (*backend, error) {
+ if openTSDBURI == "" {
+ return nil, fmt.Errorf("cannot create opentsdb PushBackend with empty --opentsdb_uri")
+ }
+
+ var w writer
+
+ // Use the file API when the uri is in format file://...
+ u, err := url.Parse(openTSDBURI)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse --opentsdb_uri %s: %v", openTSDBURI, err)
+ } else if u.Scheme == "file" {
+ fw, err := newFileWriter(u.Path)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create file-based writer for --opentsdb_uri %s: %v", openTSDBURI, err)
+ } else {
+ w = fw
+ }
+ } else {
+ w = newHTTPWriter(&http.Client{}, openTSDBURI)
+ }
+
+ return &backend{
+ prefix: prefix,
+ commonTags: stats.ParseCommonTags(stats.CommonTags),
+ writer: w,
+ }, nil
+}
diff --git a/go/stats/opentsdb/opentsdb_test.go b/go/stats/opentsdb/opentsdb_test.go
index 0e8ff240500..940ee845ada 100644
--- a/go/stats/opentsdb/opentsdb_test.go
+++ b/go/stats/opentsdb/opentsdb_test.go
@@ -352,15 +352,16 @@ func TestOpenTsdbTimings(t *testing.T) {
}
func checkOutput(t *testing.T, statName string, wantJSON string) {
- backend := &openTSDBBackend{
+ b := &backend{
prefix: "vtgate",
commonTags: map[string]string{"host": "localhost"},
}
timestamp := int64(1234)
- dc := &dataCollector{
- settings: backend,
- timestamp: timestamp,
+ dc := &collector{
+ commonTags: b.commonTags,
+ prefix: b.prefix,
+ timestamp: timestamp,
}
found := false
expvar.Do(func(kv expvar.KeyValue) {
@@ -368,9 +369,9 @@ func checkOutput(t *testing.T, statName string, wantJSON string) {
found = true
dc.addExpVar(kv)
- sort.Sort(byMetric(dc.dataPoints))
+ sort.Sort(byMetric(dc.data))
- gotBytes, err := json.MarshalIndent(dc.dataPoints, "", " ")
+ gotBytes, err := json.MarshalIndent(dc.data, "", " ")
if err != nil {
t.Errorf("Failed to marshal json: %v", err)
return
diff --git a/go/stats/opentsdb/writer.go b/go/stats/opentsdb/writer.go
new file mode 100644
index 00000000000..49d221cc782
--- /dev/null
+++ b/go/stats/opentsdb/writer.go
@@ -0,0 +1,21 @@
+/*
+Copyright 2023 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package opentsdb
+
+type writer interface {
+ Write([]*DataPoint) error
+}
diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go
index 269b185ff7c..f791d7b742d 100644
--- a/go/stats/statsd/statsd.go
+++ b/go/stats/statsd/statsd.go
@@ -219,7 +219,7 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
}
}
-// PushAll flush out the pending metrics
+// PushAll flushes out the pending metrics
func (sb StatsBackend) PushAll() error {
expvar.Do(func(kv expvar.KeyValue) {
sb.addExpVar(kv)
@@ -229,3 +229,15 @@ func (sb StatsBackend) PushAll() error {
}
return nil
}
+
+// PushOne pushes the single provided metric.
+func (sb StatsBackend) PushOne(name string, v stats.Variable) error {
+ sb.addExpVar(expvar.KeyValue{
+ Key: name,
+ Value: v,
+ })
+ if err := sb.statsdClient.Flush(); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go
index 408cc64a21b..5e80d5d3cc3 100644
--- a/go/test/endtoend/backup/vtbackup/backup_only_test.go
+++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go
@@ -19,7 +19,9 @@ package vtbackup
import (
"context"
"encoding/json"
+ "errors"
"fmt"
+ "io"
"os"
"path"
"strings"
@@ -30,6 +32,7 @@ import (
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
+ "vitess.io/vitess/go/stats/opentsdb"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
@@ -59,8 +62,9 @@ func TestTabletInitialBackup(t *testing.T) {
waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
- vtBackup(t, true, false, false)
+ dataPointReader := vtBackup(t, true, false, false)
verifyBackupCount(t, shardKsName, 1)
+ verifyBackupStats(t, dataPointReader, true /* initialBackup */)
// Initialize the tablets
initTablets(t, false, false)
@@ -144,11 +148,13 @@ func firstBackupTest(t *testing.T, tabletType string) {
// backup the replica
log.Infof("taking backup %s", time.Now())
- vtBackup(t, false, true, true)
+ dataPointReader := vtBackup(t, false, true, true)
log.Infof("done taking backup %s", time.Now())
// check that the backup shows up in the listing
verifyBackupCount(t, shardKsName, len(backups)+1)
+ // check that backup stats are what we expect
+ verifyBackupStats(t, dataPointReader, false /* initialBackup */)
// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
@@ -173,16 +179,24 @@ func firstBackupTest(t *testing.T, tabletType string) {
verifyBackupCount(t, shardKsName, 0)
}
-func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) {
+func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock")
require.Nil(t, err)
defer os.Remove(mysqlSocket.Name())
+ // Prepare opentsdb stats file path.
+ statsPath := path.Join(t.TempDir(), fmt.Sprintf("opentsdb.%s.txt", t.Name()))
+
// Take the back using vtbackup executable
extraArgs := []string{
"--allow_first_backup",
"--db-credentials-file", dbCredentialFile,
"--mysql_socket", mysqlSocket.Name(),
+
+ // Use opentsdb for stats.
+ "--stats_backend", "opentsdb",
+ // Write stats to file for reading afterwards.
+ "--opentsdb_uri", fmt.Sprintf("file://%s", statsPath),
}
if restartBeforeBackup {
extraArgs = append(extraArgs, "--restart_before_backup")
@@ -201,6 +215,10 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo
log.Infof("starting backup tablet %s", time.Now())
err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...)
require.Nil(t, err)
+
+ f, err := os.OpenFile(statsPath, os.O_RDONLY, 0)
+ require.NoError(t, err)
+ return opentsdb.NewDataPointReader(f)
}
func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string {
@@ -413,3 +431,73 @@ func waitForReplicationToCatchup(tablets []cluster.Vttablet) bool {
}
}
}
+
+func verifyBackupStats(t *testing.T, dataPointReader *opentsdb.DataPointReader, initialBackup bool) {
+ // During execution, the following phases will become active, in order.
+ var expectActivePhases []string
+ if initialBackup {
+ expectActivePhases = []string{
+ "initialbackup",
+ }
+ } else {
+ expectActivePhases = []string{
+ "restorelastbackup",
+ "catchupreplication",
+ "takenewbackup",
+ }
+ }
+
+ // Sequence of phase activity.
+ activePhases := make([]string, 0)
+
+ // Last seen phase values.
+ phaseValues := make(map[string]int64)
+
+ // Scan for phase activity until all we're out of stats to scan.
+ for dataPoint, err := dataPointReader.Read(); !errors.Is(err, io.EOF); dataPoint, err = dataPointReader.Read() {
+ // We're only interested in "vtbackup.phase" metrics in this test.
+ if dataPoint.Metric != "vtbackup.phase" {
+ continue
+ }
+
+ phase := dataPoint.Tags["phase"]
+ value := int64(dataPoint.Value)
+ lastValue, ok := phaseValues[phase]
+
+ // The value should always be 0 or 1.
+ require.True(t, int64(0) == value || int64(1) == value)
+
+ // The first time the phase is reported, it should be 0.
+ if !ok {
+ require.Equal(t, int64(0), value)
+ }
+
+ // Eventually the phase should go active. The next time it reports,
+ // it should go inactive.
+ if lastValue == 1 {
+ require.Equal(t, int64(0), value)
+ }
+
+ // Record current value.
+ phaseValues[phase] = value
+
+ // Add phase to sequence once it goes from active to inactive.
+ if lastValue == 1 && value == 0 {
+ activePhases = append(activePhases, phase)
+ }
+
+ // Verify at most one phase is active.
+ activeCount := 0
+ for _, value := range phaseValues {
+ if value == int64(0) {
+ continue
+ }
+
+ activeCount++
+ require.LessOrEqual(t, activeCount, 1)
+ }
+ }
+
+ // Verify phase sequences.
+ require.Equal(t, expectActivePhases, activePhases)
+}
diff --git a/go/test/endtoend/cluster/vtbackup_process.go b/go/test/endtoend/cluster/vtbackup_process.go
index ba508e8d593..57350922a21 100644
--- a/go/test/endtoend/cluster/vtbackup_process.go
+++ b/go/test/endtoend/cluster/vtbackup_process.go
@@ -69,8 +69,7 @@ func (vtbackup *VtbackupProcess) Setup() (err error) {
//Backup Arguments are not optional
"--backup_storage_implementation", "file",
- "--file_backup_storage_root",
- path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"),
+ "--file_backup_storage_root", path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"),
)
if vtbackup.initialBackup {