Skip to content

Commit

Permalink
Prefer replicas that have innodb buffer pool populated in PRS (vitess…
Browse files Browse the repository at this point in the history
…io#16374)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jul 24, 2024
1 parent 8ff313f commit 7f639d3
Show file tree
Hide file tree
Showing 12 changed files with 493 additions and 188 deletions.
13 changes: 5 additions & 8 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,14 @@ func TestReparentWithDownReplica(t *testing.T) {
// Perform a graceful reparent operation. It will fail as one tablet is down.
out, err := utils.Prs(t, clusterInstance, tablets[1])
require.Error(t, err)
var insertVal int
// Assert that PRS failed
if clusterInstance.VtctlMajorVersion <= 17 {
assert.True(t, utils.SetReplicationSourceFailed(tablets[2], out))
// insert data into the new primary, check the connected replica work
insertVal = utils.ConfirmReplication(t, tablets[1], []*cluster.Vttablet{tablets[0], tablets[3]})
} else {
if clusterInstance.VtctlMajorVersion <= 20 {
assert.Contains(t, out, fmt.Sprintf("TabletManager.PrimaryStatus on %s", tablets[2].Alias))
// insert data into the old primary, check the connected replica works. The primary tablet shouldn't have changed.
insertVal = utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[3]})
} else {
assert.Contains(t, out, fmt.Sprintf("TabletManager.GetGlobalStatusVars on %s", tablets[2].Alias))
}
// insert data into the old primary, check the connected replica works. The primary tablet shouldn't have changed.
insertVal := utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[3]})

// restart mysql on the old replica, should still be connecting to the old primary
tablets[2].MysqlctlProcess.InitMysql = false
Expand Down
7 changes: 4 additions & 3 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type FakeMysqlDaemon struct {
// PrimaryStatusError is used by PrimaryStatus.
PrimaryStatusError error

// GlobalStatusVars is used by GetGlobalStatusVars.
GlobalStatusVars map[string]string

// CurrentSourceHost is returned by ReplicationStatus.
CurrentSourceHost string

Expand Down Expand Up @@ -419,9 +422,7 @@ func (fmd *FakeMysqlDaemon) SetSuperReadOnly(ctx context.Context, on bool) (Rese

// GetGlobalStatusVars is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetGlobalStatusVars(ctx context.Context, variables []string) (map[string]string, error) {
return make(map[string]string), fmd.ExecuteSuperQueryList(ctx, []string{
"FAKE " + getGlobalStatusQuery,
})
return fmd.GlobalStatusVars, nil
}

// StartReplication is part of the MysqlDaemon interface.
Expand Down
39 changes: 25 additions & 14 deletions go/vt/vtctl/grpcvtctldserver/server_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/vtctl/reparentutil"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/protoutil"
Expand Down Expand Up @@ -402,19 +403,24 @@ func TestPlannedReparentShardSlow(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
GetGlobalStatusVarsResults: map[string]struct {
Statuses map[string]string
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
},
PrimaryPositionResults: map[string]struct {
Expand Down Expand Up @@ -519,19 +525,24 @@ func TestPlannedReparentShardSlow(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
GetGlobalStatusVarsResults: map[string]struct {
Statuses map[string]string
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
},
PrimaryPositionResults: map[string]struct {
Expand Down
39 changes: 24 additions & 15 deletions go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/vt/vtctl/reparentutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -8296,19 +8297,24 @@ func TestPlannedReparentShard(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
GetGlobalStatusVarsResults: map[string]struct {
Statuses map[string]string
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
},
PrimaryPositionResults: map[string]struct {
Expand Down Expand Up @@ -8425,19 +8431,22 @@ func TestPlannedReparentShard(t *testing.T) {
},
},
tmc: &testutil.TabletManagerClient{
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
GetGlobalStatusVarsResults: map[string]struct {
Statuses map[string]string
Error error
}{
"zone1-0000000200": {
Error: fmt.Errorf("primary status failed"),
Error: fmt.Errorf("global status vars failed"),
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
Statuses: map[string]string{
reparentutil.InnodbBufferPoolsDataVar: "123",
},
},
},
},
Expand All @@ -8451,7 +8460,7 @@ func TestPlannedReparentShard(t *testing.T) {
WaitReplicasTimeout: protoutil.DurationToProto(time.Millisecond * 10),
},
expectEventsToOccur: true,
expectedErr: "primary status failed",
expectedErr: "global status vars failed",
},
}

Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ type TabletManagerClient struct {
Schema *tabletmanagerdatapb.SchemaDefinition
Error error
}
GetGlobalStatusVarsDelays map[string]time.Duration
GetGlobalStatusVarsResults map[string]struct {
Statuses map[string]string
Error error
Expand Down Expand Up @@ -742,6 +743,17 @@ func (fake *TabletManagerClient) GetGlobalStatusVars(ctx context.Context, tablet
}

key := topoproto.TabletAliasString(tablet.Alias)
if fake.GetGlobalStatusVarsDelays != nil {
if delay, ok := fake.GetGlobalStatusVarsDelays[key]; ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
// proceed to results
}
}
}

if result, ok := fake.GetGlobalStatusVarsResults[key]; ok {
return result.Statuses, result.Error
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (erp *EmergencyReparenter) findMostAdvanced(
}

// sort the tablets for finding the best intermediate source in ERS
err = sortTabletsForReparent(validTablets, tabletPositions, opts.durability)
err = sortTabletsForReparent(validTablets, tabletPositions, nil, opts.durability)
if err != nil {
return nil, nil, err
}
Expand Down
31 changes: 23 additions & 8 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reparentutil
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -44,6 +45,7 @@ var (
prsCounter = stats.NewCountersWithMultiLabels("PlannedReparentCounts", "Number of times Planned Reparent Shard has been run",
[]string{"Keyspace", "Shard", "Result"},
)
InnodbBufferPoolsDataVar = "Innodb_buffer_pool_pages_data"
)

// PlannedReparenter performs PlannedReparentShard operations.
Expand Down Expand Up @@ -158,6 +160,7 @@ func (pr *PlannedReparenter) preflightChecks(
ctx context.Context,
ev *events.Reparent,
tabletMap map[string]*topo.TabletInfo,
innodbBufferPoolData map[string]int,
opts *PlannedReparentOptions, // we take a pointer here to set NewPrimaryAlias
) (isNoop bool, err error) {
// We don't want to fail when both NewPrimaryAlias and AvoidPrimaryAlias are nil.
Expand All @@ -178,7 +181,7 @@ func (pr *PlannedReparenter) preflightChecks(
}

event.DispatchUpdate(ev, "electing a primary candidate")
opts.NewPrimaryAlias, err = ElectNewPrimary(ctx, pr.tmc, &ev.ShardInfo, tabletMap, opts.NewPrimaryAlias, opts.AvoidPrimaryAlias, opts.WaitReplicasTimeout, opts.TolerableReplLag, opts.durability, pr.logger)
opts.NewPrimaryAlias, err = ElectNewPrimary(ctx, pr.tmc, &ev.ShardInfo, tabletMap, innodbBufferPoolData, opts.NewPrimaryAlias, opts.AvoidPrimaryAlias, opts.WaitReplicasTimeout, opts.TolerableReplLag, opts.durability, pr.logger)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -523,13 +526,13 @@ func (pr *PlannedReparenter) reparentShardLocked(
return err
}

err = pr.verifyAllTabletsReachable(ctx, tabletMap)
innodbBufferPoolData, err := pr.verifyAllTabletsReachable(ctx, tabletMap)
if err != nil {
return err
}

// Check invariants that PlannedReparentShard depends on.
if isNoop, err := pr.preflightChecks(ctx, ev, tabletMap, &opts); err != nil {
if isNoop, err := pr.preflightChecks(ctx, ev, tabletMap, innodbBufferPoolData, &opts); err != nil {
return err
} else if isNoop {
return nil
Expand Down Expand Up @@ -730,18 +733,30 @@ func (pr *PlannedReparenter) reparentTablets(
}

// verifyAllTabletsReachable verifies that all the tablets are reachable when running PRS.
func (pr *PlannedReparenter) verifyAllTabletsReachable(ctx context.Context, tabletMap map[string]*topo.TabletInfo) error {
func (pr *PlannedReparenter) verifyAllTabletsReachable(ctx context.Context, tabletMap map[string]*topo.TabletInfo) (map[string]int, error) {
// Create a cancellable context for the entire set of RPCs to verify reachability.
verifyCtx, verifyCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer verifyCancel()

innodbBufferPoolsData := make(map[string]int)
var mu sync.Mutex
errorGroup, groupCtx := errgroup.WithContext(verifyCtx)
for _, info := range tabletMap {
for tblStr, info := range tabletMap {
tablet := info.Tablet
errorGroup.Go(func() error {
_, err := pr.tmc.PrimaryStatus(groupCtx, tablet)
return err
statusValues, err := pr.tmc.GetGlobalStatusVars(groupCtx, tablet, []string{InnodbBufferPoolsDataVar})
if err != nil {
return err
}
// We are ignoring the error in conversion because some MySQL variants might not have this
// status variable like MariaDB.
val, _ := strconv.Atoi(statusValues[InnodbBufferPoolsDataVar])
mu.Lock()
defer mu.Unlock()
innodbBufferPoolsData[tblStr] = val
return nil
})
}
return errorGroup.Wait()
err := errorGroup.Wait()
return innodbBufferPoolsData, err
}
Loading

0 comments on commit 7f639d3

Please sign in to comment.