Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix-vtorc-timeouts
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed May 22, 2024
2 parents 85737df + d20cb57 commit acfcfc0
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 283 deletions.
20 changes: 20 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,23 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
require.NoError(t, err)
}

// TestERSWithWriteInPromoteReplica tests that ERS doesn't fail even if there is a
// write that happens when PromoteReplica is called.
func TestERSWithWriteInPromoteReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Drop a table so that when sidecardb changes are checked, we run a DML query.
utils.RunSQLs(context.Background(), t, []string{
"set sql_log_bin=0",
`SET @@global.super_read_only=0`,
`DROP TABLE _vt.heartbeat`,
"set sql_log_bin=1",
}, tablets[3])
_, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, "ERS should not fail even if there is a sidecardb change")
}
10 changes: 6 additions & 4 deletions go/vt/vtctl/grpcvtctldserver/server_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func TestEmergencyReparentShardSlow(t *testing.T) {
},
},
},
PopulateReparentJournalDelays: map[string]time.Duration{
"zone1-0000000200": time.Second * 29,
SetReplicationSourceDelays: map[string]time.Duration{
"zone1-0000000100": time.Second * 29,
"zone1-0000000101": time.Second * 29,
},
PopulateReparentJournalResults: map[string]error{
"zone1-0000000200": nil,
Expand Down Expand Up @@ -224,8 +225,9 @@ func TestEmergencyReparentShardSlow(t *testing.T) {
},
},
},
PopulateReparentJournalDelays: map[string]time.Duration{
"zone1-0000000200": time.Second * 31,
SetReplicationSourceDelays: map[string]time.Duration{
"zone1-0000000100": time.Second * 31,
"zone1-0000000101": time.Second * 31,
},
PopulateReparentJournalResults: map[string]error{
"zone1-0000000200": nil,
Expand Down
75 changes: 31 additions & 44 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
// Since the new primary tablet belongs to the validCandidateTablets list, we no longer need any additional constraint checks

// Final step is to promote our primary candidate
err = erp.promoteNewPrimary(ctx, ev, newPrimary, opts, tabletMap, stoppedReplicationSnapshot.statusMap)
_, err = erp.reparentReplicas(ctx, ev, newPrimary, tabletMap, stoppedReplicationSnapshot.statusMap, opts, false /* intermediateReparent */)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func (erp *EmergencyReparenter) promoteIntermediateSource(

// we reparent all the other valid tablets to start replication from our new source
// we wait for all the replicas so that we can choose a better candidate from the ones that started replication later
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, validTabletMap, statusMap, opts, true /* waitForAllReplicas */, false /* populateReparentJournal */)
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, validTabletMap, statusMap, opts, true /* intermediateReparent */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -487,8 +487,10 @@ func (erp *EmergencyReparenter) reparentReplicas(
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
opts EmergencyReparentOptions,
waitForAllReplicas bool,
populateReparentJournal bool,
intermediateReparent bool, // intermediateReparent represents whether the reparenting of the replicas is the final reparent or not.
// Since ERS can sometimes promote a tablet, which isn't a candidate for promotion, if it is the most advanced, we don't want to
// call PromoteReplica on it. We just want to get all replicas to replicate from it to get caught up, after which we'll promote the primary
// candidate separately. During the final promotion, we call `PromoteReplica` and `PopulateReparentJournal`.
) ([]*topodatapb.Tablet, error) {

var (
Expand All @@ -497,6 +499,8 @@ func (erp *EmergencyReparenter) reparentReplicas(
)

replCtx, replCancel := context.WithTimeout(context.Background(), opts.WaitReplicasTimeout)
primaryCtx, primaryCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer primaryCancel()

event.DispatchUpdate(ev, "reparenting all tablets")

Expand All @@ -518,13 +522,26 @@ func (erp *EmergencyReparenter) reparentReplicas(
rec := concurrency.AllErrorRecorder{}

handlePrimary := func(alias string, tablet *topodatapb.Tablet) error {
position, err := erp.tmc.PrimaryPosition(replCtx, tablet)
if err != nil {
return err
}
if populateReparentJournal {
if !intermediateReparent {
var position string
var err error
if ev.ShardInfo.PrimaryAlias == nil {
erp.logger.Infof("setting up %v as new primary for an uninitialized cluster", alias)
// we call InitPrimary when the PrimaryAlias in the ShardInfo is empty. This happens when we have an uninitialized cluster.
position, err = erp.tmc.InitPrimary(primaryCtx, tablet, SemiSyncAckers(opts.durability, tablet) > 0)
} else {
erp.logger.Infof("starting promotion for the new primary - %v", alias)
// we call PromoteReplica which changes the tablet type, fixes the semi-sync, set the primary to read-write and flushes the binlogs
position, err = erp.tmc.PromoteReplica(primaryCtx, tablet, SemiSyncAckers(opts.durability, tablet) > 0)
}
if err != nil {
return vterrors.Wrapf(err, "primary-elect tablet %v failed to be upgraded to primary: %v", alias, err)
}
erp.logger.Infof("populating reparent journal on new primary %v", alias)
return erp.tmc.PopulateReparentJournal(replCtx, tablet, now, opts.lockAction, newPrimaryTablet.Alias, position)
err = erp.tmc.PopulateReparentJournal(primaryCtx, tablet, now, opts.lockAction, tablet.Alias, position)
if err != nil {
return vterrors.Wrapf(err, "failed to PopulateReparentJournal on primary: %v", err)
}
}
return nil
}
Expand Down Expand Up @@ -559,8 +576,8 @@ func (erp *EmergencyReparenter) reparentReplicas(
replicaMutex.Unlock()

// Signal that at least one goroutine succeeded to SetReplicationSource.
// We do this only when we do not want to wait for all the replicas
if !waitForAllReplicas {
// We do this only when we do not want to wait for all the replicas.
if !intermediateReparent {
replSuccessCancel()
}
}
Expand Down Expand Up @@ -594,10 +611,10 @@ func (erp *EmergencyReparenter) reparentReplicas(

primaryErr := handlePrimary(topoproto.TabletAliasString(newPrimaryTablet.Alias), newPrimaryTablet)
if primaryErr != nil {
erp.logger.Warningf("primary failed to PopulateReparentJournal")
erp.logger.Errorf("failed to promote %s to primary", topoproto.TabletAliasString(newPrimaryTablet.Alias))
replCancel()

return nil, vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on primary: %v", primaryErr)
return nil, vterrors.Wrapf(primaryErr, "failed to promote %v to primary", topoproto.TabletAliasString(newPrimaryTablet.Alias))
}

// We should only cancel the context that all the replicas are using when they are done.
Expand Down Expand Up @@ -709,36 +726,6 @@ func (erp *EmergencyReparenter) identifyPrimaryCandidate(
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unreachable - did not find a valid primary candidate even though the valid candidate list was non-empty")
}

func (erp *EmergencyReparenter) promoteNewPrimary(
ctx context.Context,
ev *events.Reparent,
newPrimary *topodatapb.Tablet,
opts EmergencyReparentOptions,
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
) error {
var err error
if ev.ShardInfo.PrimaryAlias == nil {
erp.logger.Infof("setting up %v as new primary for an uninitialized cluster", newPrimary.Alias)
// we call InitPrimary when the PrimaryAlias in the ShardInfo is empty. This happens when we have an uninitialized cluster.
_, err = erp.tmc.InitPrimary(ctx, newPrimary, SemiSyncAckers(opts.durability, newPrimary) > 0)
} else {
erp.logger.Infof("starting promotion for the new primary - %v", newPrimary.Alias)
// we call PromoteReplica which changes the tablet type, fixes the semi-sync, set the primary to read-write and flushes the binlogs
_, err = erp.tmc.PromoteReplica(ctx, newPrimary, SemiSyncAckers(opts.durability, newPrimary) > 0)
}
if err != nil {
return vterrors.Wrapf(err, "primary-elect tablet %v failed to be upgraded to primary: %v", newPrimary.Alias, err)
}
// we now reparent all the replicas to the new primary we have promoted.
// Here we do not need to wait for all the replicas, We can finish early when even 1 succeeds.
_, err = erp.reparentReplicas(ctx, ev, newPrimary, tabletMap, statusMap, opts, false /* waitForAllReplicas */, true /* populateReparentJournal */)
if err != nil {
return err
}
return nil
}

// filterValidCandidates filters valid tablets, keeping only the ones which can successfully be promoted without any constraint failures and can make forward progress on being promoted
func (erp *EmergencyReparenter) filterValidCandidates(validTablets []*topodatapb.Tablet, tabletsReachable []*topodatapb.Tablet, prevPrimary *topodatapb.Tablet, opts EmergencyReparentOptions) ([]*topodatapb.Tablet, error) {
var restrictedValidTablets []*topodatapb.Tablet
Expand Down
Loading

0 comments on commit acfcfc0

Please sign in to comment.