Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShowReplicationStatus with proper timeout support #14483

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'main' of https://github.com/vitessio/vitess into fix-he…
…althcheck-stalls
  • Loading branch information
arthurschreiber committed May 30, 2024
commit de80197216d0f9026a94b66e236097bc2d611478
4 changes: 2 additions & 2 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
}

lastStatus = status
status, statusErr = mysqld.ReplicationStatusWithContext(ctx)
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand Down Expand Up @@ -565,7 +565,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
}

// Did we make any progress?
status, statusErr = mysqld.ReplicationStatusWithContext(ctx)
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac

// See if we need to restart replication after backup.
params.Logger.Infof("getting current replication status")
replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx)
replicaStatus, err := params.Mysqld.ReplicationStatus(ctx)
switch err {
case nil:
replicaStartRequired = replicaStatus.Healthy() && !DisableActiveReparents
Expand Down Expand Up @@ -456,7 +456,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac
if err := params.Mysqld.StopReplication(ctx, params.HookExtraEnv); err != nil {
return BackupUnusable, vterrors.Wrapf(err, "can't stop replica")
}
replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx)
replicaStatus, err := params.Mysqld.ReplicationStatus(ctx)
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "can't get replica status")
}
Expand Down Expand Up @@ -557,7 +557,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac
if err := ctx.Err(); err != nil {
return backupResult, err
}
status, err := params.Mysqld.ReplicationStatusWithContext(ctx)
status, err := params.Mysqld.ReplicationStatus(ctx)
if err != nil {
return backupResult, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
}

func (fmd *FakeMysqlDaemon) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) {
return fmd.ReplicationStatus()
return fmd.ReplicationStatus(ctx)
}

// PrimaryStatus is part of the MysqlDaemon interface.
Expand Down
3 changes: 1 addition & 2 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ type MysqlDaemon interface {
StartReplicationUntilAfter(ctx context.Context, pos replication.Position) error
StopReplication(ctx context.Context, hookExtraEnv map[string]string) error
StopIOThread(ctx context.Context) error
ReplicationStatus() (replication.ReplicationStatus, error)
ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error)
ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error)
PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error)
ReplicationConfiguration(ctx context.Context) (*replicationdata.Configuration, error)
GetGTIDPurged(ctx context.Context) (replication.Position, error)
Expand Down
6 changes: 1 addition & 5 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,7 @@ func (mysqld *Mysqld) CatchupToGTID(ctx context.Context, targetPos replication.P
}

// ReplicationStatus returns the server replication status
func (mysqld *Mysqld) ReplicationStatus() (replication.ReplicationStatus, error) {
return mysqld.ReplicationStatusWithContext(context.TODO())
}

func (mysqld *Mysqld) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) {
func (mysqld *Mysqld) ReplicationStatus(ctx context.Context) (replication.ReplicationStatus, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return replication.ReplicationStatus{}, err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (tm *TabletManager) startReplication(ctx context.Context, pos replication.P
if err := ctx.Err(); err != nil {
return err
}
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err != nil {
return vterrors.Wrap(err, "can't get replication status")
}
Expand Down
13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import (

// ReplicationStatus returns the replication status
func (tm *TabletManager) ReplicationStatus(ctx context.Context) (*replicationdatapb.Status, error) {
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err != nil {
return nil, err
}
Expand All @@ -64,7 +67,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
}

// Replication status - "SHOW REPLICA STATUS"
replicationStatus, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
replicationStatus, err := tm.MysqlDaemon.ReplicationStatus(ctx)
var replicationStatusProto *replicationdatapb.Status
if err != nil && err != mysql.ErrNotReplica {
return nil, err
Expand Down Expand Up @@ -695,7 +698,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
// See if we were replicating at all, and should be replicating.
wasReplicating := false
shouldbeReplicating := false
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err == mysql.ErrNotReplica {
// This is a special error that means we actually succeeded in reading
// the status, but the status is empty because replication is not
Expand Down Expand Up @@ -824,7 +827,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe
// Get the status before we stop replication.
// Doing this first allows us to return the status in the case that stopping replication
// returns an error, so a user can optionally inspect the status before a stop was called.
rs, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
rs, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err != nil {
return StopReplicationAndGetStatusResponse{}, vterrors.Wrap(err, "before status failed")
}
Expand Down Expand Up @@ -866,7 +869,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe
}

// Get the status after we stop replication so we have up to date position and relay log positions.
rsAfter, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
rsAfter, err := tm.MysqlDaemon.ReplicationStatus(ctx)
if err != nil {
return StopReplicationAndGetStatusResponse{
Status: &replicationdatapb.StopReplicationStatus{
Expand Down
5 changes: 2 additions & 3 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func (p *poller) Status() (time.Duration, error) {
p.mu.Lock()
defer p.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

status, err := p.mysqld.ReplicationStatusWithContext(ctx)
status, err := p.mysqld.ReplicationStatus(ctx)
if err != nil {
return 0, err
}
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.