diff --git a/go.sum b/go.sum index 6ce1df5e358..cc300ba3899 100644 --- a/go.sum +++ b/go.sum @@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/go/vt/vttablet/tabletmanager/replmanager.go b/go/vt/vttablet/tabletmanager/replmanager.go index 3f949494be4..4985e2401ba 100644 --- a/go/vt/vttablet/tabletmanager/replmanager.go +++ b/go/vt/vttablet/tabletmanager/replmanager.go @@ -17,13 +17,12 @@ limitations under the License. package tabletmanager import ( + "context" "os" "path" "sync" "time" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" @@ -105,6 +104,7 @@ func (rm *replManager) check() { func (rm *replManager) checkActionLocked() { status, err := rm.tm.MysqlDaemon.ReplicationStatus() if err != nil { + log.Infof("slack-debug: %v", err) if err != mysql.ErrNotReplica { return } @@ -116,12 +116,14 @@ func (rm *replManager) checkActionLocked() { } } + log.Infof("slack-debug: rm.failed=%v", rm.failed) if !rm.failed { log.Infof("Replication is stopped, reconnecting to primary.") } ctx, cancel := context.WithTimeout(rm.ctx, 5*time.Second) defer cancel() if err := rm.tm.repairReplication(ctx); err != nil { + log.Infof("slack-debug: repairReplication failed with=%v", err) if !rm.failed { rm.failed = true log.Infof("Failed to reconnect to primary: %v, will keep retrying.", err) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 7f15a3cf26e..1b5dc22348d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -17,6 +17,7 @@ limitations under the License. package tabletmanager import ( + "context" "flag" "fmt" "strconv" @@ -25,8 +26,6 @@ import ( "vitess.io/vitess/go/vt/proto/vtrpc" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -476,10 +475,10 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // // It attemps to idempotently ensure the following guarantees upon returning // successfully: -// * No future writes will be accepted. -// * No writes are in-flight. -// * MySQL is in read-only mode. -// * Semi-sync settings are consistent with a REPLICA tablet. +// - No future writes will be accepted. +// - No writes are in-flight. +// - MySQL is in read-only mode. +// - Semi-sync settings are consistent with a REPLICA tablet. // // If necessary, it waits for all in-flight writes to complete or time out. // @@ -703,6 +702,7 @@ func (tm *TabletManager) setReplicationSourceRepairReplication(ctx context.Conte return err } + log.Infof("slack-debug: calling tm.TopoServer.LockShard") ctx, unlock, lockErr := tm.TopoServer.LockShard(ctx, parent.Tablet.GetKeyspace(), parent.Tablet.GetShard(), fmt.Sprintf("repairReplication to %v as parent)", topoproto.TabletAliasString(parentAlias))) if lockErr != nil { return lockErr @@ -724,6 +724,12 @@ func (tm *TabletManager) setReplicationSourceSemiSyncNoAction(ctx context.Contex } func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool, semiSync SemiSyncAction) (err error) { + tm._isSetReplicationSourceLockedRunning = true + + defer func() { + tm._isSetReplicationSourceLockedRunning = false + }() + // End orchestrator maintenance at the end of fixing replication. // This is a best effort operation, so it should happen in a goroutine defer func() { @@ -745,6 +751,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA // unintentionally change the type of RDONLY tablets tablet := tm.Tablet() if tablet.Type == topodatapb.TabletType_PRIMARY { + log.Infof("slack-debug: calling tm.tmState.ChangeTabletType") if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { return err } @@ -755,6 +762,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA shouldbeReplicating := false status, err := tm.MysqlDaemon.ReplicationStatus() if err == mysql.ErrNotReplica { + log.Infof("slack-debug: err == mysql.ErrNotReplica") // This is a special error that means we actually succeeded in reading // the status, but the status is empty because replication is not // configured. We assume this means we used to be a primary, so we always @@ -781,6 +789,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA if tabletType == topodatapb.TabletType_PRIMARY { tabletType = topodatapb.TabletType_REPLICA } + log.Infof("slack-debug: calling tm.fixSemiSync") if err := tm.fixSemiSync(tabletType, semiSync); err != nil { return err } @@ -797,6 +806,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA host := parent.Tablet.MysqlHostname port := int(parent.Tablet.MysqlPort) if status.SourceHost != host || status.SourcePort != port { + log.Infof("slack-debug: calling tm.MysqlDaemon.SetReplicationSource") // This handles both changing the address and starting replication. if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil { if err := tm.handleRelayLogError(err); err != nil { @@ -1053,18 +1063,18 @@ func (tm *TabletManager) fixSemiSync(tabletType topodatapb.TabletType, semiSync // This following code will be uncommented and the above deleted when we are ready to use the // durability policies for setting the semi_sync information - //switch semiSync { - //case SemiSyncActionNone: + // switch semiSync { + // case SemiSyncActionNone: // return nil - //case SemiSyncActionSet: + // case SemiSyncActionSet: // // Always enable replica-side since it doesn't hurt to keep it on for a primary. // // The primary-side needs to be off for a replica, or else it will get stuck. // return tm.MysqlDaemon.SetSemiSyncEnabled(tabletType == topodatapb.TabletType_PRIMARY, true) - //case SemiSyncActionUnset: + // case SemiSyncActionUnset: // return tm.MysqlDaemon.SetSemiSyncEnabled(false, false) - //default: + // default: // return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync) - //} + // } } func (tm *TabletManager) isPrimarySideSemiSyncEnabled() bool { @@ -1077,10 +1087,10 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT // Semi-sync handling is not enabled. return nil } - //if semiSync == SemiSyncActionNone { + // if semiSync == SemiSyncActionNone { // // Semi-sync handling is not required. // return nil - //} + // } if tabletType == topodatapb.TabletType_PRIMARY { // Primary is special. It is always handled at the @@ -1106,7 +1116,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT return nil } - //shouldAck := semiSync == SemiSyncActionSet + // shouldAck := semiSync == SemiSyncActionSet shouldAck := isPrimaryEligible(tabletType) acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus() if err != nil { @@ -1144,6 +1154,17 @@ func (tm *TabletManager) handleRelayLogError(err error) error { // repairReplication tries to connect this server to whoever is // the current primary of the shard, and start replicating. func (tm *TabletManager) repairReplication(ctx context.Context) error { + log.Infof("slack-debug: entering repairReplication") + + if tm._isSetReplicationSourceLockedRunning { + // we are actively setting replication source, + // repairReplication will block due to higher + // authority holding a shard lock (PRS on vtctld) + log.Infof("slack-debug: we are actively setting replication source, exiting") + + return nil + } + tablet := tm.Tablet() si, err := tm.TopoServer.GetShard(ctx, tablet.Keyspace, tablet.Shard) @@ -1164,6 +1185,7 @@ func (tm *TabletManager) repairReplication(ctx context.Context) error { // If Orchestrator is configured and if Orchestrator is actively reparenting, we should not repairReplication if tm.orc != nil { + log.Infof("slack-debug: tm.orc != nil") re, err := tm.orc.InActiveShardRecovery(tablet) if err != nil { return err diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index b56d1a31bbe..b04eda587ca 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -195,6 +195,8 @@ type TabletManager struct { _lockTablesTimer *time.Timer // _isBackupRunning tells us whether there is a backup that is currently running _isBackupRunning bool + // _isSetReplicationSourceLockedRunning indicates we are actively running setReplicationSourceLocked + _isSetReplicationSourceLockedRunning bool } // BuildTabletFromInput builds a tablet record from input parameters.