Skip to content

Commit

Permalink
go/cmd/vtbackup: report replication status metrics during catch-up ph…
Browse files Browse the repository at this point in the history
…ase (#13995)

Signed-off-by: Max Englander <[email protected]>
Signed-off-by: Max Englander <[email protected]>
Co-authored-by: Andrew Mason <[email protected]>
  • Loading branch information
maxenglander and ajm188 authored Sep 19, 2023
1 parent f8a40dd commit 757333e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
9 changes: 9 additions & 0 deletions changelog/18.0/18.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [Deleted `vtgr`](#deleted-vtgr)
- **[New stats](#new-stats)**
- [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters)
- [VTBackup stat `PhaseStatus`](#vtbackup-stat-phase-status)
- **[VTTablet](#vttablet)**
- [VTTablet: New ResetSequences RPC](#vttablet-new-rpc-reset-sequences)
- **[Docker](#docker)**
Expand Down Expand Up @@ -120,6 +121,14 @@ The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessi

The VTGate stat `VindexUnknownParameters` gauges unknown Vindex parameters found in the latest VSchema pulled from the topology.

#### <a id="vtbackup-stat-phase-status"/>VTBackup `PhaseStatus` stat

`PhaseStatus` reports a 1 (active) or a 0 (inactive) for each of the following phases and statuses:

* `CatchUpReplication` phase has statuses `Stalled` and `Stopped`.
* `Stalled` is set to `1` when replication stops advancing.
* `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary.

### <a id="vttablet"/>VTTablet

#### <a id="vttablet-new-rpc-reset-sequences"/>New ResetSequences rpc
Expand Down
49 changes: 45 additions & 4 deletions go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ const (
// place a hard cap on the overall time for a backup, while also not waiting
// forever for things that should be quick.
operationTimeout = 1 * time.Minute

phaseNameCatchUpReplication = "CatchUpReplication"
phaseStatusCatchUpReplicationStalled = "Stalled"
phaseStatusCatchUpReplicationStopped = "Stopped"
)

var (
Expand Down Expand Up @@ -128,6 +132,17 @@ var (
"How long it took vtbackup to perform each phase (in seconds).",
"phase",
)
phaseStatus = stats.NewGaugesWithMultiLabels(
"PhaseStatus",
"Internal state of vtbackup phase.",
[]string{"phase", "status"},
)
phaseStatuses = map[string][]string{
phaseNameCatchUpReplication: {
phaseStatusCatchUpReplicationStalled,
phaseStatusCatchUpReplicationStopped,
},
}
)

func registerFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -203,6 +218,13 @@ func main() {
topoServer := topo.Open()
defer topoServer.Close()

// Initialize stats.
for phaseName, statuses := range phaseStatuses {
for _, status := range statuses {
phaseStatus.Set([]string{phaseName, status}, 0)
}
}

// Try to take a backup, if it's been long enough since the last one.
// Skip pruning if backup wasn't fully successful. We don't want to be
// deleting things if the backup process is not healthy.
Expand Down Expand Up @@ -433,15 +455,22 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
backupParams.BackupTime = time.Now()

// Wait for replication to catch up.
waitStartTime := time.Now()
var (
lastStatus replication.ReplicationStatus
status replication.ReplicationStatus
statusErr error

waitStartTime = time.Now()
)
for {
select {
case <-ctx.Done():
return fmt.Errorf("error in replication catch up: %v", ctx.Err())
case <-time.After(time.Second):
}

status, statusErr := mysqld.ReplicationStatus()
lastStatus = status
status, statusErr = mysqld.ReplicationStatus()
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand All @@ -453,11 +482,21 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
durationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds()))
break
}
if !lastStatus.Position.IsZero() {
if status.Position.Equal(lastStatus.Position) {
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 1)
} else {
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0)
}
}
if !status.Healthy() {
log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.")
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 1)
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
}
} else {
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0)
}
}

Expand All @@ -467,14 +506,16 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

// Did we make any progress?
status, err := mysqld.ReplicationStatus()
if err != nil {
status, statusErr = mysqld.ReplicationStatus()
if statusErr != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
log.Infof("Replication caught up to %v", status.Position)
if !status.Position.AtLeast(primaryPos) && status.Position.Equal(restorePos) {
return fmt.Errorf("not taking backup: replication did not make any progress from restore point: %v", restorePos)
}
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0)
phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0)

// Re-enable redo logging.
if disabledRedoLog {
Expand Down

0 comments on commit 757333e

Please sign in to comment.