From ebbd418809ad4043835cf8cfe90ac9d6d78e740b Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 20 Sep 2023 10:58:11 +0100 Subject: [PATCH] metrics: change vtbackup_duration_by_phase to binary-valued vtbackup_phase (#12973) Signed-off-by: Max Englander --- changelog/18.0/18.0.0/summary.md | 26 ++- go/cmd/vtbackup/plugin_opentsdb.go | 25 +++ go/cmd/vtbackup/vtbackup.go | 83 +++++-- go/flags/endtoend/vtbackup.txt | 1 + go/stats/counters.go | 23 ++ go/stats/export.go | 28 ++- go/stats/opentsdb/backend.go | 58 +++++ go/stats/opentsdb/by_metric.go | 54 +++++ .../opentsdb/{opentsdb.go => collector.go} | 212 ++++-------------- go/stats/opentsdb/datapoint.go | 90 ++++++++ go/stats/opentsdb/datapoint_reader.go | 53 +++++ go/stats/opentsdb/doc.go | 18 ++ go/stats/opentsdb/file_writer.go | 52 +++++ go/stats/opentsdb/flags.go | 38 ++++ go/stats/opentsdb/http_writer.go | 51 +++++ go/stats/opentsdb/init.go | 104 +++++++++ go/stats/opentsdb/opentsdb_test.go | 13 +- go/stats/opentsdb/writer.go | 21 ++ go/stats/statsd/statsd.go | 14 +- .../backup/vtbackup/backup_only_test.go | 94 +++++++- go/test/endtoend/cluster/vtbackup_process.go | 3 +- 21 files changed, 845 insertions(+), 216 deletions(-) create mode 100644 go/cmd/vtbackup/plugin_opentsdb.go create mode 100644 go/stats/opentsdb/backend.go create mode 100644 go/stats/opentsdb/by_metric.go rename go/stats/opentsdb/{opentsdb.go => collector.go} (54%) create mode 100644 go/stats/opentsdb/datapoint.go create mode 100644 go/stats/opentsdb/datapoint_reader.go create mode 100644 go/stats/opentsdb/doc.go create mode 100644 go/stats/opentsdb/file_writer.go create mode 100644 go/stats/opentsdb/flags.go create mode 100644 go/stats/opentsdb/http_writer.go create mode 100644 go/stats/opentsdb/init.go create mode 100644 go/stats/opentsdb/writer.go diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md index 61cf6be1cf8..db2494d3794 100644 --- a/changelog/18.0/18.0.0/summary.md +++ b/changelog/18.0/18.0.0/summary.md @@ -16,8 +16,10 @@ - [Deleted `V3` planner](#deleted-v3) - [Deleted `k8stopo`](#deleted-k8stopo) - [Deleted `vtgr`](#deleted-vtgr) + - [Deprecated VTBackup stat `DurationByPhase`](#deprecated-vtbackup-stat-duration-by-phase) - **[New stats](#new-stats)** - [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters) + - [VTBackup stat `Phase`](#vtbackup-stat-phase) - [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status) - **[VTTablet](#vttablet)** - [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences) @@ -115,17 +117,37 @@ the `k8stopo` has been removed. The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessio/vitess/issues/13300. With Vitess 18 `vtgr` has been removed. +#### Deprecated VTbackup stat `DurationByPhase` + +VTBackup stat `DurationByPhase` is deprecated. Use the binary-valued `Phase` stat instead. + ### New stats #### VTGate Vindex unknown parameters The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found in the latest VSchema pulled from the topology. +#### VTBackup `Phase` stat + +In v17, the `vtbackup` stat `DurationByPhase` stat was added measuring the time spent by `vtbackup` in each phase. This stat turned out to be awkward to use in production, and has been replaced in v18 by a binary-valued `Phase` stat. + +`Phase` reports a 1 (active) or a 0 (inactive) for each of the following phases: + + * `CatchupReplication` + * `InitialBackup` + * `RestoreLastBackup` + * `TakeNewBackup` + +To calculate how long `vtbackup` has spent in a given phase, sum the 1-valued data points over time and multiply by the data collection or reporting interval. For example, in Prometheus: + +``` +sum_over_time(vtbackup_phase{phase="TakeNewBackup"}) * +``` #### VTBackup `PhaseStatus` stat `PhaseStatus` reports a 1 (active) or a 0 (inactive) for each of the following phases and statuses: - * `CatchUpReplication` phase has statuses `Stalled` and `Stopped`. + * `CatchupReplication` phase has statuses `Stalled` and `Stopped`. * `Stalled` is set to `1` when replication stops advancing. * `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary. @@ -165,4 +187,4 @@ removing Vitess support. #### New Durability Policies -2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs. \ No newline at end of file +2 new inbuilt durability policies have been added to Vitess in this release namely `semi_sync_with_rdonly_ack` and `cross_cell_with_rdonly_ack`. These policies are exactly like `semi_sync` and `cross_cell` respectively, and differ just in the part where the rdonly tablets can also send semi-sync ACKs. diff --git a/go/cmd/vtbackup/plugin_opentsdb.go b/go/cmd/vtbackup/plugin_opentsdb.go new file mode 100644 index 00000000000..44ac886d420 --- /dev/null +++ b/go/cmd/vtbackup/plugin_opentsdb.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import "vitess.io/vitess/go/stats/opentsdb" + +// This plugin imports opentsdb to register the opentsdb stats backend. + +func init() { + opentsdb.Init("vtbackup") +} diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index f27a991d35e..ebf83526cad 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -100,9 +100,12 @@ const ( // forever for things that should be quick. operationTimeout = 1 * time.Minute - phaseNameCatchUpReplication = "CatchUpReplication" - phaseStatusCatchUpReplicationStalled = "Stalled" - phaseStatusCatchUpReplicationStopped = "Stopped" + phaseNameCatchupReplication = "CatchupReplication" + phaseNameInitialBackup = "InitialBackup" + phaseNameRestoreLastBackup = "RestoreLastBackup" + phaseNameTakeNewBackup = "TakeNewBackup" + phaseStatusCatchupReplicationStalled = "Stalled" + phaseStatusCatchupReplicationStopped = "Stopped" ) var ( @@ -127,20 +130,44 @@ var ( detachedMode bool keepAliveTimeout = 0 * time.Second disableRedoLog = false - durationByPhase = stats.NewGaugesWithSingleLabel( + + // Deprecated, use "Phase" instead. + deprecatedDurationByPhase = stats.NewGaugesWithSingleLabel( "DurationByPhaseSeconds", - "How long it took vtbackup to perform each phase (in seconds).", + "[DEPRECATED] How long it took vtbackup to perform each phase (in seconds).", + "phase", + ) + + // This gauge is updated 3*N times during the course of a vtbackup run, + // where N is the number of different phases vtbackup transitions through. + // Once to initialize to 0, another time to set the phase to active (1), + // and another to deactivate the phase (back to 0). + // + // At most a single phase is active at a given time. + // + // The sync gauge immediately reports changes to push-backed backends. + // The benefit of the sync gauge is that it makes verifying stats in + // integration tests a lot more tractable. + phase = stats.NewSyncGaugesWithSingleLabel( + "Phase", + "Active phase.", "phase", ) + phaseNames = []string{ + phaseNameCatchupReplication, + phaseNameInitialBackup, + phaseNameRestoreLastBackup, + phaseNameTakeNewBackup, + } phaseStatus = stats.NewGaugesWithMultiLabels( "PhaseStatus", "Internal state of vtbackup phase.", []string{"phase", "status"}, ) phaseStatuses = map[string][]string{ - phaseNameCatchUpReplication: { - phaseStatusCatchUpReplicationStalled, - phaseStatusCatchUpReplicationStopped, + phaseNameCatchupReplication: { + phaseStatusCatchupReplicationStalled, + phaseStatusCatchupReplicationStopped, }, } ) @@ -219,6 +246,9 @@ func main() { defer topoServer.Close() // Initialize stats. + for _, phaseName := range phaseNames { + phase.Set(phaseName, int64(0)) + } for phaseName, statuses := range phaseStatuses { for _, status := range statuses { phaseStatus.Set([]string{phaseName, status}, 0) @@ -294,7 +324,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if err := mysqld.Init(initCtx, mycnf, initDBSQLFile); err != nil { return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err) } - durationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds())) + deprecatedDurationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds())) // Shut down mysqld when we're done. defer func() { // Be careful not to use the original context, because we don't want to @@ -358,14 +388,19 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back backupParams.BackupTime = time.Now() // Now we're ready to take the backup. + phase.Set(phaseNameInitialBackup, int64(1)) + defer phase.Set(phaseNameInitialBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("backup failed: %v", err) } - durationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds())) + deprecatedDurationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds())) log.Info("Initial backup successful.") + phase.Set(phaseNameInitialBackup, int64(0)) return nil } + phase.Set(phaseNameRestoreLastBackup, int64(1)) + defer phase.Set(phaseNameRestoreLastBackup, int64(0)) backupDir := mysqlctl.GetBackupDir(initKeyspace, initShard) log.Infof("Restoring latest backup from directory %v", backupDir) restoreAt := time.Now() @@ -397,7 +432,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back default: return fmt.Errorf("can't restore from backup: %v", err) } - durationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds())) + deprecatedDurationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds())) + phase.Set(phaseNameRestoreLastBackup, int64(0)) // As of MySQL 8.0.21, you can disable redo logging using the ALTER INSTANCE // DISABLE INNODB REDO_LOG statement. This functionality is intended for @@ -455,6 +491,9 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back backupParams.BackupTime = time.Now() // Wait for replication to catch up. + phase.Set(phaseNameCatchupReplication, int64(1)) + defer phase.Set(phaseNameCatchupReplication, int64(0)) + var ( lastStatus replication.ReplicationStatus status replication.ReplicationStatus @@ -479,26 +518,27 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back // We're caught up on replication to at least the point the primary // was at when this vtbackup run started. log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime)) - durationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds())) + deprecatedDurationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds())) break } if !lastStatus.Position.IsZero() { if status.Position.Equal(lastStatus.Position) { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 1) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 1) } else { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0) } } if !status.Healthy() { log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.") - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 1) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 1) if err := startReplication(ctx, mysqld, topoServer); err != nil { log.Warningf("Failed to restart replication: %v", err) } } else { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0) } } + phase.Set(phaseNameCatchupReplication, int64(0)) // Stop replication and see where we are. if err := mysqld.StopReplication(nil); err != nil { @@ -514,8 +554,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if !status.Position.AtLeast(primaryPos) && status.Position.Equal(restorePos) { return fmt.Errorf("not taking backup: replication did not make any progress from restore point: %v", restorePos) } - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0) - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0) // Re-enable redo logging. if disabledRedoLog { @@ -539,15 +579,18 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if err := mysqld.Start(ctx, mycnf); err != nil { return fmt.Errorf("Could not start MySQL after full shutdown: %v", err) } - durationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds())) + deprecatedDurationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds())) } // Now we can take a new backup. backupAt := time.Now() + phase.Set(phaseNameTakeNewBackup, int64(1)) + defer phase.Set(phaseNameTakeNewBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("error taking backup: %v", err) } - durationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds())) + deprecatedDurationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds())) + phase.Set(phaseNameTakeNewBackup, int64(0)) // Return a non-zero exit code if we didn't meet the replication position // goal, even though we took a backup that pushes the high-water mark up. diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index 44cb2a08462..46e4efea301 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -134,6 +134,7 @@ Usage of vtbackup: --mysql_server_version string MySQL server version to advertise. (default "8.0.30-Vitess") --mysql_socket string path to the mysql socket --mysql_timeout duration how long to wait for mysqld startup (default 5m0s) + --opentsdb_uri string URI of opentsdb /api/put method --port int port for the server --pprof strings enable profiling --purge_logs_interval duration how often try to remove old logs (default 1h0m0s) diff --git a/go/stats/counters.go b/go/stats/counters.go index 371cbd53818..a4dfc0dcb1f 100644 --- a/go/stats/counters.go +++ b/go/stats/counters.go @@ -321,6 +321,29 @@ func (g *GaugesWithSingleLabel) Set(name string, value int64) { g.counters.set(name, value) } +// SyncGaugesWithSingleLabel is a GaugesWithSingleLabel that proactively pushes +// stats to push-based backends when Set is called. +type SyncGaugesWithSingleLabel struct { + GaugesWithSingleLabel + name string +} + +// NewSyncGaugesWithSingleLabel creates a new SyncGaugesWithSingleLabel. +func NewSyncGaugesWithSingleLabel(name, help, label string, tags ...string) *SyncGaugesWithSingleLabel { + return &SyncGaugesWithSingleLabel{ + GaugesWithSingleLabel: *NewGaugesWithSingleLabel(name, help, label, tags...), + name: name, + } +} + +// Set sets the value of a named gauge. +func (sg *SyncGaugesWithSingleLabel) Set(name string, value int64) { + sg.GaugesWithSingleLabel.Set(name, value) + if sg.name != "" { + _ = pushOne(sg.name, &sg.GaugesWithSingleLabel) + } +} + // GaugesWithMultiLabels is a CountersWithMultiLabels implementation where // the values can go up and down. type GaugesWithMultiLabels struct { diff --git a/go/stats/export.go b/go/stats/export.go index e98ef0a969c..8bda85c87b2 100644 --- a/go/stats/export.go +++ b/go/stats/export.go @@ -121,6 +121,22 @@ func Publish(name string, v expvar.Var) { publish(name, v) } +func pushAll() error { + backend, ok := pushBackends[statsBackend] + if !ok { + return fmt.Errorf("no PushBackend registered with name %s", statsBackend) + } + return backend.PushAll() +} + +func pushOne(name string, v Variable) error { + backend, ok := pushBackends[statsBackend] + if !ok { + return fmt.Errorf("no PushBackend registered with name %s", statsBackend) + } + return backend.PushOne(name, v) +} + // StringMapFuncWithMultiLabels is a multidimensional string map publisher. // // Map keys are compound names made with joining multiple strings with '.', @@ -183,8 +199,10 @@ func publish(name string, v expvar.Var) { // to be pushed to it. It's used to support push-based metrics backends, as expvar // by default only supports pull-based ones. type PushBackend interface { - // PushAll pushes all stats from expvar to the backend + // PushAll pushes all stats from expvar to the backend. PushAll() error + // PushOne pushes a single stat from expvar to the backend. + PushOne(name string, v Variable) error } var pushBackends = make(map[string]PushBackend) @@ -214,13 +232,7 @@ func emitToBackend(emitPeriod *time.Duration) { ticker := time.NewTicker(*emitPeriod) defer ticker.Stop() for range ticker.C { - backend, ok := pushBackends[statsBackend] - if !ok { - log.Errorf("No PushBackend registered with name %s", statsBackend) - return - } - err := backend.PushAll() - if err != nil { + if err := pushAll(); err != nil { // TODO(aaijazi): This might cause log spam... log.Warningf("Pushing stats to backend %v failed: %v", statsBackend, err) } diff --git a/go/stats/opentsdb/backend.go b/go/stats/opentsdb/backend.go new file mode 100644 index 00000000000..e5c766ba797 --- /dev/null +++ b/go/stats/opentsdb/backend.go @@ -0,0 +1,58 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "time" + + "vitess.io/vitess/go/stats" +) + +// backend implements stats.PushBackend +type backend struct { + // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be + // prepended to all the stats reported. + prefix string + // Tags that should be included with every data point. If there's a tag name + // collision between the common tags and a single data point's tags, the data + // point tag will override the common tag. + commonTags map[string]string + // writer is used to send data points somewhere (file, http, ...). + writer writer +} + +// PushAll pushes all stats to OpenTSDB +func (b *backend) PushAll() error { + collector := b.collector() + collector.collectAll() + return b.writer.Write(collector.data) +} + +// PushOne pushes a single stat to OpenTSDB +func (b *backend) PushOne(name string, v stats.Variable) error { + collector := b.collector() + collector.collectOne(name, v) + return b.writer.Write(collector.data) +} + +func (b *backend) collector() *collector { + return &collector{ + commonTags: b.commonTags, + prefix: b.prefix, + timestamp: time.Now().Unix(), + } +} diff --git a/go/stats/opentsdb/by_metric.go b/go/stats/opentsdb/by_metric.go new file mode 100644 index 00000000000..5177109a18e --- /dev/null +++ b/go/stats/opentsdb/by_metric.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +// byMetric implements sort.Interface for []*DataPoint based on the metric key +// and then tag values (prioritized in tag name order). Having a consistent sort order +// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly +// in the tests. +type byMetric []*DataPoint + +func (m byMetric) Len() int { return len(m) } +func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +func (m byMetric) Less(i, j int) bool { + if m[i].Metric < m[j].Metric { + return true + } + + if m[i].Metric > m[j].Metric { + return false + } + + // Metric names are the same. We can use tag values to figure out the sort order. + // The deciding tag will be the lexicographically earliest tag name where tag values differ. + decidingTagName := "" + result := false + for tagName, iVal := range m[i].Tags { + jVal, ok := m[j].Tags[tagName] + if !ok { + // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier. + // This shouldn't happen in practice, though, if metric code is correct... + return true + } + + if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") { + decidingTagName = tagName + result = iVal < jVal + } + } + return result +} diff --git a/go/stats/opentsdb/opentsdb.go b/go/stats/opentsdb/collector.go similarity index 54% rename from go/stats/opentsdb/opentsdb.go rename to go/stats/opentsdb/collector.go index 3e85052b5f4..9b870815067 100644 --- a/go/stats/opentsdb/opentsdb.go +++ b/go/stats/opentsdb/collector.go @@ -14,151 +14,47 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package opentsdb adds support for pushing stats to opentsdb. package opentsdb import ( "bytes" "encoding/json" "expvar" - "net/http" - "sort" "strings" - "time" "unicode" - "github.com/spf13/pflag" - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/servenv" ) -var openTsdbURI string - -func registerFlags(fs *pflag.FlagSet) { - fs.StringVar(&openTsdbURI, "opentsdb_uri", openTsdbURI, "URI of opentsdb /api/put method") -} - -func init() { - servenv.OnParseFor("vtctld", registerFlags) - servenv.OnParseFor("vtgate", registerFlags) - servenv.OnParseFor("vttablet", registerFlags) -} - -// dataPoint represents a single OpenTSDB data point. -type dataPoint struct { - // Example: sys.cpu.nice - Metric string `json:"metric"` - // Seconds or milliseconds since unix epoch. - Timestamp float64 `json:"timestamp"` - Value float64 `json:"value"` - Tags map[string]string `json:"tags"` -} - -// sendDataPoints pushes a list of data points to openTSDB. -// All other code in this file is just to support getting this function called -// with all stats represented as data points. -func sendDataPoints(data []dataPoint) error { - json, err := json.Marshal(data) - if err != nil { - return err - } - - resp, err := http.Post(openTsdbURI, "application/json", bytes.NewReader(json)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// openTSDBBackend implements stats.PushBackend -type openTSDBBackend struct { - // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be - // prepended to all the stats reported. - prefix string - // Tags that should be included with every data point. If there's a tag name - // collision between the common tags and a single data point's tags, the data - // point tag will override the common tag. +// collector tracks state for a single pass of stats reporting / data collection. +type collector struct { commonTags map[string]string -} - -// dataCollector tracks state for a single pass of stats reporting / data collection. -type dataCollector struct { - settings *openTSDBBackend + data []*DataPoint + prefix string timestamp int64 - dataPoints []dataPoint -} - -// Init attempts to create a singleton openTSDBBackend and register it as a PushBackend. -// If it fails to create one, this is a noop. The prefix argument is an optional string -// to prepend to the name of every data point reported. -func Init(prefix string) { - // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging - servenv.OnRun(func() { - InitWithoutServenv(prefix) - }) -} - -// InitWithoutServenv initializes the opentsdb without servenv -func InitWithoutServenv(prefix string) { - if openTsdbURI == "" { - return - } - - backend := &openTSDBBackend{ - prefix: prefix, - commonTags: stats.ParseCommonTags(stats.CommonTags), - } - - stats.RegisterPushBackend("opentsdb", backend) - - servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - dataPoints := (*backend).getDataPoints() - sort.Sort(byMetric(dataPoints)) - - if b, err := json.MarshalIndent(dataPoints, "", " "); err != nil { - w.Write([]byte(err.Error())) - } else { - w.Write(b) - } - }) } -// PushAll pushes all stats to OpenTSDB -func (backend *openTSDBBackend) PushAll() error { - return sendDataPoints(backend.getDataPoints()) -} - -// getDataPoints fetches all stats in an opentsdb-compatible format. -// This is separated from PushAll() so it can be reused for the /debug/opentsdb handler. -func (backend *openTSDBBackend) getDataPoints() []dataPoint { - dataCollector := &dataCollector{ - settings: backend, - timestamp: time.Now().Unix(), - } - +func (dc *collector) collectAll() { expvar.Do(func(kv expvar.KeyValue) { - dataCollector.addExpVar(kv) + dc.addExpVar(kv) }) - - return dataCollector.dataPoints } -// combineMetricName joins parts of a hierarchical name with a "." -func combineMetricName(parts ...string) string { - return strings.Join(parts, ".") +func (dc *collector) collectOne(name string, v expvar.Var) { + dc.addExpVar(expvar.KeyValue{ + Key: name, + Value: v, + }) } -func (dc *dataCollector) addInt(metric string, val int64, tags map[string]string) { +func (dc *collector) addInt(metric string, val int64, tags map[string]string) { dc.addFloat(metric, float64(val), tags) } -func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]string) { +func (dc *collector) addFloat(metric string, val float64, tags map[string]string) { var fullMetric string - if len(dc.settings.prefix) > 0 { - fullMetric = combineMetricName(dc.settings.prefix, metric) + if len(dc.prefix) > 0 { + fullMetric = combineMetricName(dc.prefix, metric) } else { fullMetric = metric } @@ -182,20 +78,20 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st } fullTags := make(map[string]string) - for k, v := range dc.settings.commonTags { + for k, v := range dc.commonTags { fullTags[sanitize(k)] = sanitize(v) } for k, v := range tags { fullTags[sanitize(k)] = sanitize(v) } - dp := dataPoint{ + dp := &DataPoint{ Metric: sanitize(fullMetric), Value: val, Timestamp: float64(dc.timestamp), Tags: fullTags, } - dc.dataPoints = append(dc.dataPoints, dp) + dc.data = append(dc.data, dp) } // addExpVar adds all the data points associated with a particular expvar to the list of @@ -206,7 +102,7 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st // // Generic unrecognized expvars are serialized to json and their int/float values are exported. // Strings and lists in expvars are not exported. -func (dc *dataCollector) addExpVar(kv expvar.KeyValue) { +func (dc *collector) addExpVar(kv expvar.KeyValue) { k := kv.Key switch v := kv.Value.(type) { case stats.FloatFunc: @@ -268,24 +164,8 @@ func (dc *dataCollector) addExpVar(kv expvar.KeyValue) { } } -// makeLabel builds a tag list with a single label + value. -func makeLabel(labelName string, labelVal string) map[string]string { - return map[string]string{labelName: labelVal} -} - -// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it -// apart into a map of label name -> label value. -func makeLabels(labelNames []string, labelValsCombined string) map[string]string { - tags := make(map[string]string) - labelVals := strings.Split(labelValsCombined, ".") - for i, v := range labelVals { - tags[labelNames[i]] = v - } - return tags -} - // addUnrecognizedExpvars recurses into a json object to pull out float64 variables to report. -func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]any) { +func (dc *collector) addUnrecognizedExpvars(prefix string, obj map[string]any) { for k, v := range obj { prefix := combineMetricName(prefix, k) switch v := v.(type) { @@ -298,7 +178,7 @@ func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]an } // addTimings converts a vitess Timings stat to something opentsdb can deal with. -func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, prefix string) { +func (dc *collector) addTimings(labels []string, timings *stats.Timings, prefix string) { histograms := timings.Histograms() for labelValsCombined, histogram := range histograms { // If you prefer millisecond timings over nanoseconds you can pass 1000000 here instead of 1. @@ -306,7 +186,7 @@ func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, pre } } -func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) { +func (dc *collector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) { // TODO: OpenTSDB 2.3 doesn't have histogram support, although it's forthcoming. // For simplicity we report each bucket as a different metric. // @@ -335,39 +215,23 @@ func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64 ) } -// byMetric implements sort.Interface for []dataPoint based on the metric key -// and then tag values (prioritized in tag name order). Having a consistent sort order -// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly -// in the tests. -type byMetric []dataPoint - -func (m byMetric) Len() int { return len(m) } -func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m byMetric) Less(i, j int) bool { - if m[i].Metric < m[j].Metric { - return true - } - - if m[i].Metric > m[j].Metric { - return false - } +// combineMetricName joins parts of a hierarchical name with a "." +func combineMetricName(parts ...string) string { + return strings.Join(parts, ".") +} - // Metric names are the same. We can use tag values to figure out the sort order. - // The deciding tag will be the lexicographically earliest tag name where tag values differ. - decidingTagName := "" - result := false - for tagName, iVal := range m[i].Tags { - jVal, ok := m[j].Tags[tagName] - if !ok { - // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier. - // This shouldn't happen in practice, though, if metric code is correct... - return true - } +// makeLabel builds a tag list with a single label + value. +func makeLabel(labelName string, labelVal string) map[string]string { + return map[string]string{labelName: labelVal} +} - if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") { - decidingTagName = tagName - result = iVal < jVal - } +// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it +// apart into a map of label name -> label value. +func makeLabels(labelNames []string, labelValsCombined string) map[string]string { + tags := make(map[string]string) + labelVals := strings.Split(labelValsCombined, ".") + for i, v := range labelVals { + tags[labelNames[i]] = v } - return result + return tags } diff --git a/go/stats/opentsdb/datapoint.go b/go/stats/opentsdb/datapoint.go new file mode 100644 index 00000000000..42e69b84d47 --- /dev/null +++ b/go/stats/opentsdb/datapoint.go @@ -0,0 +1,90 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "fmt" + "strconv" + "strings" +) + +// DataPoint represents a single OpenTSDB data point. +type DataPoint struct { + // Example: sys.cpu.nice + Metric string `json:"metric"` + // Seconds or milliseconds since unix epoch. + Timestamp float64 `json:"timestamp"` + Value float64 `json:"value"` + Tags map[string]string `json:"tags"` +} + +func (dp *DataPoint) MarshalText() (string, error) { + var sb strings.Builder + + if _, err := sb.WriteString(fmt.Sprintf("%s %f %f", dp.Metric, dp.Timestamp, dp.Value)); err != nil { + return "", err + } + + for k, v := range dp.Tags { + if _, err := sb.WriteString(fmt.Sprintf(" %s=%s", k, v)); err != nil { + return "", err + } + } + + if _, err := sb.WriteString("\n"); err != nil { + return "", err + } + + return sb.String(), nil +} + +func unmarshalTextToData(dp *DataPoint, text []byte) error { + parts := strings.Split(string(text), " ") + + if len(parts) < 3 { + // Technically every OpenTSDB time series requires at least one tag, + // but some of the metrics we send have zero. + return fmt.Errorf("require format: [ ]") + } + + dp.Metric = parts[0] + + timestamp, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + return err + } + dp.Timestamp = timestamp + + value, err := strconv.ParseFloat(parts[2], 64) + if err != nil { + return err + } + dp.Value = value + + for _, kv := range parts[3:] { + tagParts := strings.Split(kv, "=") + if len(tagParts) != 2 { + return fmt.Errorf("require tag format: ") + } + if dp.Tags == nil { + dp.Tags = make(map[string]string) + } + dp.Tags[tagParts[0]] = tagParts[1] + } + + return nil +} diff --git a/go/stats/opentsdb/datapoint_reader.go b/go/stats/opentsdb/datapoint_reader.go new file mode 100644 index 00000000000..441be9eb7a1 --- /dev/null +++ b/go/stats/opentsdb/datapoint_reader.go @@ -0,0 +1,53 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "bufio" + "io" +) + +var newLineDelimiter = byte('\n') + +// DataPointReader parses bytes from io.Reader into DataPoints. +type DataPointReader struct { + reader *bufio.Reader +} + +func NewDataPointReader(r io.Reader) *DataPointReader { + return &DataPointReader{ + reader: bufio.NewReader(r), + } +} + +// Read returns a DataPoint from the underlying io.Reader. +// +// Returns an error if no DataPoint could be parsed. +func (tr *DataPointReader) Read() (*DataPoint, error) { + bs, err := tr.reader.ReadBytes(newLineDelimiter) + if err != nil { + return nil, err + } + + dp := &DataPoint{} + + if err := unmarshalTextToData(dp, bs[:len(bs)-1]); err != nil { + return nil, err + } + + return dp, nil +} diff --git a/go/stats/opentsdb/doc.go b/go/stats/opentsdb/doc.go new file mode 100644 index 00000000000..88c22a58c70 --- /dev/null +++ b/go/stats/opentsdb/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package opentsdb adds support for pushing stats to opentsdb. +package opentsdb diff --git a/go/stats/opentsdb/file_writer.go b/go/stats/opentsdb/file_writer.go new file mode 100644 index 00000000000..7f2d2f2ccc7 --- /dev/null +++ b/go/stats/opentsdb/file_writer.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "io" + "os" +) + +type fileWriter struct { + writer io.WriteCloser +} + +func newFileWriter(path string) (writer, error) { + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + return &fileWriter{ + writer: f, + }, nil +} + +func (fw *fileWriter) Write(data []*DataPoint) error { + for _, d := range data { + text, err := d.MarshalText() + if err != nil { + return err + } + + if _, err := fw.writer.Write([]byte(text)); err != nil { + return err + } + } + + return nil +} diff --git a/go/stats/opentsdb/flags.go b/go/stats/opentsdb/flags.go new file mode 100644 index 00000000000..8ccd0279981 --- /dev/null +++ b/go/stats/opentsdb/flags.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/servenv" +) + +var ( + openTSDBURI string +) + +func registerFlags(fs *pflag.FlagSet) { + fs.StringVar(&openTSDBURI, "opentsdb_uri", openTSDBURI, "URI of opentsdb /api/put method") +} + +func init() { + servenv.OnParseFor("vtbackup", registerFlags) + servenv.OnParseFor("vtctld", registerFlags) + servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vttablet", registerFlags) +} diff --git a/go/stats/opentsdb/http_writer.go b/go/stats/opentsdb/http_writer.go new file mode 100644 index 00000000000..7b7801d7f77 --- /dev/null +++ b/go/stats/opentsdb/http_writer.go @@ -0,0 +1,51 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "bytes" + "encoding/json" + "net/http" +) + +type httpWriter struct { + client *http.Client + uri string +} + +func newHTTPWriter(client *http.Client, uri string) *httpWriter { + return &httpWriter{ + client: client, + uri: uri, + } +} + +func (hw *httpWriter) Write(data []*DataPoint) error { + jsonb, err := json.Marshal(data) + if err != nil { + return err + } + + resp, err := hw.client.Post(hw.uri, "application/json", bytes.NewReader(jsonb)) + if err != nil { + return err + } + + resp.Body.Close() + + return nil +} diff --git a/go/stats/opentsdb/init.go b/go/stats/opentsdb/init.go new file mode 100644 index 00000000000..51186ad7650 --- /dev/null +++ b/go/stats/opentsdb/init.go @@ -0,0 +1,104 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" +) + +var singletonBackend stats.PushBackend + +// Init attempts to create a singleton *opentsdb.backend and register it as a PushBackend. +// If it fails to create one, this is a noop. The prefix argument is an optional string +// to prepend to the name of every data point reported. +func Init(prefix string) { + // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging + servenv.OnRun(func() { + log.Info("Initializing opentsdb backend...") + backend, err := InitWithoutServenv(prefix) + if err != nil { + log.Infof("Failed to initialize singleton opentsdb backend: %v", err) + } else { + singletonBackend = backend + log.Info("Initialized opentsdb backend.") + } + }) +} + +// InitWithoutServenv initializes the opentsdb without servenv +func InitWithoutServenv(prefix string) (stats.PushBackend, error) { + b, err := newBackend(prefix) + + if err != nil { + return nil, err + } + + stats.RegisterPushBackend("opentsdb", b) + + servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + collector := b.collector() + collector.collectAll() + data := collector.data + sort.Sort(byMetric(data)) + + if b, err := json.MarshalIndent(data, "", " "); err != nil { + w.Write([]byte(err.Error())) + } else { + w.Write(b) + } + }) + + return b, nil +} + +func newBackend(prefix string) (*backend, error) { + if openTSDBURI == "" { + return nil, fmt.Errorf("cannot create opentsdb PushBackend with empty --opentsdb_uri") + } + + var w writer + + // Use the file API when the uri is in format file://... + u, err := url.Parse(openTSDBURI) + if err != nil { + return nil, fmt.Errorf("failed to parse --opentsdb_uri %s: %v", openTSDBURI, err) + } else if u.Scheme == "file" { + fw, err := newFileWriter(u.Path) + if err != nil { + return nil, fmt.Errorf("failed to create file-based writer for --opentsdb_uri %s: %v", openTSDBURI, err) + } else { + w = fw + } + } else { + w = newHTTPWriter(&http.Client{}, openTSDBURI) + } + + return &backend{ + prefix: prefix, + commonTags: stats.ParseCommonTags(stats.CommonTags), + writer: w, + }, nil +} diff --git a/go/stats/opentsdb/opentsdb_test.go b/go/stats/opentsdb/opentsdb_test.go index 0e8ff240500..940ee845ada 100644 --- a/go/stats/opentsdb/opentsdb_test.go +++ b/go/stats/opentsdb/opentsdb_test.go @@ -352,15 +352,16 @@ func TestOpenTsdbTimings(t *testing.T) { } func checkOutput(t *testing.T, statName string, wantJSON string) { - backend := &openTSDBBackend{ + b := &backend{ prefix: "vtgate", commonTags: map[string]string{"host": "localhost"}, } timestamp := int64(1234) - dc := &dataCollector{ - settings: backend, - timestamp: timestamp, + dc := &collector{ + commonTags: b.commonTags, + prefix: b.prefix, + timestamp: timestamp, } found := false expvar.Do(func(kv expvar.KeyValue) { @@ -368,9 +369,9 @@ func checkOutput(t *testing.T, statName string, wantJSON string) { found = true dc.addExpVar(kv) - sort.Sort(byMetric(dc.dataPoints)) + sort.Sort(byMetric(dc.data)) - gotBytes, err := json.MarshalIndent(dc.dataPoints, "", " ") + gotBytes, err := json.MarshalIndent(dc.data, "", " ") if err != nil { t.Errorf("Failed to marshal json: %v", err) return diff --git a/go/stats/opentsdb/writer.go b/go/stats/opentsdb/writer.go new file mode 100644 index 00000000000..49d221cc782 --- /dev/null +++ b/go/stats/opentsdb/writer.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +type writer interface { + Write([]*DataPoint) error +} diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index 269b185ff7c..f791d7b742d 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -219,7 +219,7 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { } } -// PushAll flush out the pending metrics +// PushAll flushes out the pending metrics func (sb StatsBackend) PushAll() error { expvar.Do(func(kv expvar.KeyValue) { sb.addExpVar(kv) @@ -229,3 +229,15 @@ func (sb StatsBackend) PushAll() error { } return nil } + +// PushOne pushes the single provided metric. +func (sb StatsBackend) PushOne(name string, v stats.Variable) error { + sb.addExpVar(expvar.KeyValue{ + Key: name, + Value: v, + }) + if err := sb.statsdClient.Flush(); err != nil { + return err + } + return nil +} diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index 408cc64a21b..5e80d5d3cc3 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -19,7 +19,9 @@ package vtbackup import ( "context" "encoding/json" + "errors" "fmt" + "io" "os" "path" "strings" @@ -30,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/stats/opentsdb" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" @@ -59,8 +62,9 @@ func TestTabletInitialBackup(t *testing.T) { waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) - vtBackup(t, true, false, false) + dataPointReader := vtBackup(t, true, false, false) verifyBackupCount(t, shardKsName, 1) + verifyBackupStats(t, dataPointReader, true /* initialBackup */) // Initialize the tablets initTablets(t, false, false) @@ -144,11 +148,13 @@ func firstBackupTest(t *testing.T, tabletType string) { // backup the replica log.Infof("taking backup %s", time.Now()) - vtBackup(t, false, true, true) + dataPointReader := vtBackup(t, false, true, true) log.Infof("done taking backup %s", time.Now()) // check that the backup shows up in the listing verifyBackupCount(t, shardKsName, len(backups)+1) + // check that backup stats are what we expect + verifyBackupStats(t, dataPointReader, false /* initialBackup */) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) @@ -173,16 +179,24 @@ func firstBackupTest(t *testing.T, tabletType string) { verifyBackupCount(t, shardKsName, 0) } -func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) { +func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader { mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock") require.Nil(t, err) defer os.Remove(mysqlSocket.Name()) + // Prepare opentsdb stats file path. + statsPath := path.Join(t.TempDir(), fmt.Sprintf("opentsdb.%s.txt", t.Name())) + // Take the back using vtbackup executable extraArgs := []string{ "--allow_first_backup", "--db-credentials-file", dbCredentialFile, "--mysql_socket", mysqlSocket.Name(), + + // Use opentsdb for stats. + "--stats_backend", "opentsdb", + // Write stats to file for reading afterwards. + "--opentsdb_uri", fmt.Sprintf("file://%s", statsPath), } if restartBeforeBackup { extraArgs = append(extraArgs, "--restart_before_backup") @@ -201,6 +215,10 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo log.Infof("starting backup tablet %s", time.Now()) err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...) require.Nil(t, err) + + f, err := os.OpenFile(statsPath, os.O_RDONLY, 0) + require.NoError(t, err) + return opentsdb.NewDataPointReader(f) } func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string { @@ -413,3 +431,73 @@ func waitForReplicationToCatchup(tablets []cluster.Vttablet) bool { } } } + +func verifyBackupStats(t *testing.T, dataPointReader *opentsdb.DataPointReader, initialBackup bool) { + // During execution, the following phases will become active, in order. + var expectActivePhases []string + if initialBackup { + expectActivePhases = []string{ + "initialbackup", + } + } else { + expectActivePhases = []string{ + "restorelastbackup", + "catchupreplication", + "takenewbackup", + } + } + + // Sequence of phase activity. + activePhases := make([]string, 0) + + // Last seen phase values. + phaseValues := make(map[string]int64) + + // Scan for phase activity until all we're out of stats to scan. + for dataPoint, err := dataPointReader.Read(); !errors.Is(err, io.EOF); dataPoint, err = dataPointReader.Read() { + // We're only interested in "vtbackup.phase" metrics in this test. + if dataPoint.Metric != "vtbackup.phase" { + continue + } + + phase := dataPoint.Tags["phase"] + value := int64(dataPoint.Value) + lastValue, ok := phaseValues[phase] + + // The value should always be 0 or 1. + require.True(t, int64(0) == value || int64(1) == value) + + // The first time the phase is reported, it should be 0. + if !ok { + require.Equal(t, int64(0), value) + } + + // Eventually the phase should go active. The next time it reports, + // it should go inactive. + if lastValue == 1 { + require.Equal(t, int64(0), value) + } + + // Record current value. + phaseValues[phase] = value + + // Add phase to sequence once it goes from active to inactive. + if lastValue == 1 && value == 0 { + activePhases = append(activePhases, phase) + } + + // Verify at most one phase is active. + activeCount := 0 + for _, value := range phaseValues { + if value == int64(0) { + continue + } + + activeCount++ + require.LessOrEqual(t, activeCount, 1) + } + } + + // Verify phase sequences. + require.Equal(t, expectActivePhases, activePhases) +} diff --git a/go/test/endtoend/cluster/vtbackup_process.go b/go/test/endtoend/cluster/vtbackup_process.go index ba508e8d593..57350922a21 100644 --- a/go/test/endtoend/cluster/vtbackup_process.go +++ b/go/test/endtoend/cluster/vtbackup_process.go @@ -69,8 +69,7 @@ func (vtbackup *VtbackupProcess) Setup() (err error) { //Backup Arguments are not optional "--backup_storage_implementation", "file", - "--file_backup_storage_root", - path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), + "--file_backup_storage_root", path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), ) if vtbackup.initialBackup {