Skip to content

Commit

Permalink
Merge branch 'slack-15.0' into logical-backups-backport2
Browse files Browse the repository at this point in the history
  • Loading branch information
rvrangel authored Oct 24, 2024
2 parents 9b83c1e + 37b690c commit 13ba09d
Show file tree
Hide file tree
Showing 16 changed files with 1,451 additions and 1,049 deletions.
32 changes: 27 additions & 5 deletions go/cmd/vtctldclient/command/reparents.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var emergencyReparentShardOptions = struct {
Force bool
WaitReplicasTimeout time.Duration
NewPrimaryAliasStr string
ExpectedPrimaryAliasStr string
IgnoreReplicaAliasStrList []string
PreventCrossCellPromotion bool
}{}
Expand All @@ -104,6 +105,7 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {

var (
newPrimaryAlias *topodatapb.TabletAlias
expectedPrimaryAlias *topodatapb.TabletAlias
ignoreReplicaAliases = make([]*topodatapb.TabletAlias, len(emergencyReparentShardOptions.IgnoreReplicaAliasStrList))
)

Expand All @@ -114,6 +116,13 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {
}
}

if emergencyReparentShardOptions.ExpectedPrimaryAliasStr != "" {
expectedPrimaryAlias, err = topoproto.ParseTabletAlias(emergencyReparentShardOptions.ExpectedPrimaryAliasStr)
if err != nil {
return err
}
}

for i, aliasStr := range emergencyReparentShardOptions.IgnoreReplicaAliasStrList {
alias, err := topoproto.ParseTabletAlias(aliasStr)
if err != nil {
Expand All @@ -129,6 +138,7 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {
Keyspace: keyspace,
Shard: shard,
NewPrimary: newPrimaryAlias,
ExpectedPrimary: expectedPrimaryAlias,
IgnoreReplicas: ignoreReplicaAliases,
WaitReplicasTimeout: protoutil.DurationToProto(emergencyReparentShardOptions.WaitReplicasTimeout),
PreventCrossCellPromotion: emergencyReparentShardOptions.PreventCrossCellPromotion,
Expand Down Expand Up @@ -181,9 +191,10 @@ func commandInitShardPrimary(cmd *cobra.Command, args []string) error {
}

var plannedReparentShardOptions = struct {
NewPrimaryAliasStr string
AvoidPrimaryAliasStr string
WaitReplicasTimeout time.Duration
NewPrimaryAliasStr string
AvoidPrimaryAliasStr string
ExpectedPrimaryAliasStr string
WaitReplicasTimeout time.Duration
}{}

func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
Expand All @@ -193,8 +204,9 @@ func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
}

var (
newPrimaryAlias *topodatapb.TabletAlias
avoidPrimaryAlias *topodatapb.TabletAlias
newPrimaryAlias *topodatapb.TabletAlias
avoidPrimaryAlias *topodatapb.TabletAlias
expectedPrimaryAlias *topodatapb.TabletAlias
)

if plannedReparentShardOptions.NewPrimaryAliasStr != "" {
Expand All @@ -211,13 +223,21 @@ func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
}
}

if plannedReparentShardOptions.ExpectedPrimaryAliasStr != "" {
expectedPrimaryAlias, err = topoproto.ParseTabletAlias(plannedReparentShardOptions.ExpectedPrimaryAliasStr)
if err != nil {
return err
}
}

cli.FinishedParsing(cmd)

resp, err := client.PlannedReparentShard(commandCtx, &vtctldatapb.PlannedReparentShardRequest{
Keyspace: keyspace,
Shard: shard,
NewPrimary: newPrimaryAlias,
AvoidPrimary: avoidPrimaryAlias,
ExpectedPrimary: expectedPrimaryAlias,
WaitReplicasTimeout: protoutil.DurationToProto(plannedReparentShardOptions.WaitReplicasTimeout),
})
if err != nil {
Expand Down Expand Up @@ -280,6 +300,7 @@ func commandTabletExternallyReparented(cmd *cobra.Command, args []string) error
func init() {
EmergencyReparentShard.Flags().DurationVar(&emergencyReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up in reparenting.")
EmergencyReparentShard.Flags().StringVar(&emergencyReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary. If not specified, the vtctld will select the best candidate to promote.")
EmergencyReparentShard.Flags().StringVar(&emergencyReparentShardOptions.ExpectedPrimaryAliasStr, "expected-primary", "", "Alias of a tablet that must be the current primary in order for the reparent to be processed.")
EmergencyReparentShard.Flags().BoolVar(&emergencyReparentShardOptions.PreventCrossCellPromotion, "prevent-cross-cell-promotion", false, "Only promotes a new primary from the same cell as the previous primary.")
EmergencyReparentShard.Flags().StringSliceVarP(&emergencyReparentShardOptions.IgnoreReplicaAliasStrList, "ignore-replicas", "i", nil, "Comma-separated, repeated list of replica tablet aliases to ignore during the emergency reparent.")
Root.AddCommand(EmergencyReparentShard)
Expand All @@ -291,6 +312,7 @@ func init() {
PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up on replication both before and after reparenting.")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary.")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.AvoidPrimaryAliasStr, "avoid-primary", "", "Alias of a tablet that should not be the primary; i.e. \"reparent to any other tablet if this one is the primary\".")
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.ExpectedPrimaryAliasStr, "expected-primary", "", "Alias of a tablet that must be the current primary in order for the reparent to be processed.")
Root.AddCommand(PlannedReparentShard)

Root.AddCommand(ReparentTablet)
Expand Down
16 changes: 2 additions & 14 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cellAliases: make(map[string]string),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -376,19 +375,8 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))

topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
36 changes: 30 additions & 6 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
filter := NewFilterByKeyspace([]string{"keyspace"})
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -162,10 +163,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias)
tw.loadTablets()

// Confirm second tablet triggers ListTablets + AddTablet calls.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is returned by GetAllTablets().
// Add a third tablet in a filtered keyspace to the topology.
tablet3 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 3,
},
Hostname: "host3",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "excluded",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias)
tw.loadTablets()

// Confirm filtered tablet did not trigger an AddTablet call.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0})
checkChecksum(t, tw, 3177315266)

// Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet.
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
Expand Down Expand Up @@ -197,14 +219,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet, allTablets[key]))
assert.NotContains(t, allTablets, origKey)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
} else {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0})
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, origKey)
assert.True(t, proto.Equal(origTablet, allTablets[origKey]))
assert.NotContains(t, allTablets, key)
checkChecksum(t, tw, 2762153755)
checkChecksum(t, tw, 3177315266)
}

// Both tablets restart on different hosts.
Expand Down Expand Up @@ -260,7 +282,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 789108290)
checkChecksum(t, tw, 852159264)

allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 1)
Expand All @@ -271,8 +293,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet2, allTablets[key]))

// Remove the other and check that it is detected as being gone.
// Remove the other tablets and check that it is detected as being gone.
// Deleting the filtered tablet should not trigger a RemoveTablet call.
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))
require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias))
_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
Expand Down
44 changes: 27 additions & 17 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
streamModeTar = "tar"
xtrabackupBinaryName = "xtrabackup"
xtrabackupEngineName = "xtrabackup"
xtrabackupInfoFile = "xtrabackup_info"
xbstream = "xbstream"

// closeTimeout is the timeout for closing backup files after writing.
Expand Down Expand Up @@ -238,15 +239,22 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
return true, nil
}

func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) {

func (be *XtrabackupEngine) backupFiles(
ctx context.Context,
params BackupParams,
bh backupstorage.BackupHandle,
backupFileName string,
numStripes int,
flavor string,
) (replicationPosition mysql.Position, finalErr error) {
backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName)
flagsToExec := []string{"--defaults-file=" + params.Cnf.Path,
"--backup",
"--socket=" + params.Cnf.SocketFile,
"--slave-info",
"--user=" + xtrabackupUser,
"--target-dir=" + params.Cnf.TmpDir,
"--extra-lsndir=" + params.Cnf.TmpDir,
}
if xtrabackupStreamMode != "" {
flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode)
Expand Down Expand Up @@ -345,27 +353,14 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
// the replication position. Note that if we don't read stderr as we go, the
// xtrabackup process gets blocked when the write buffer fills up.
stderrBuilder := &strings.Builder{}
posBuilder := &strings.Builder{}
stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)

scanner := bufio.NewScanner(backupErr)
capture := false
for scanner.Scan() {
line := scanner.Text()
params.Logger.Infof("xtrabackup stderr: %s", line)

// Wait until we see the first line of the binlog position.
// Then capture all subsequent lines. We need multiple lines since
// the value we're looking for has newlines in it.
if !capture {
if !strings.Contains(line, "MySQL binlog position") {
continue
}
capture = true
}
fmt.Fprintln(posBuilder, line)
}
if err := scanner.Err(); err != nil {
params.Logger.Errorf("error reading from xtrabackup stderr: %v", err)
Expand Down Expand Up @@ -409,8 +404,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput))
}

posOutput := posBuilder.String()
replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger)
replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger)
if rerr != nil {
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
}
Expand Down Expand Up @@ -694,6 +688,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
return nil
}

func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (mysql.Position, error) {
f, err := os.Open(path.Join(directory, xtrabackupInfoFile))
if err != nil {
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT,
"couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile))
}
defer f.Close()

contents, err := io.ReadAll(f)
if err != nil {
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name())
}

return findReplicationPosition(string(contents), flavor, logger)
}

var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`)

func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) {
Expand Down
46 changes: 36 additions & 10 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"bytes"
"io"
"math/rand"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/logutil"
)

Expand Down Expand Up @@ -51,26 +55,48 @@ func TestFindReplicationPosition(t *testing.T) {
}
}

func TestFindReplicationPositionNoMatch(t *testing.T) {
func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) {
input := `tool_version = 8.0.35-30
binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,
1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,
47b59de1-b368-11e9-b48b-624401d35560:1-152981,
557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,
599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,
b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262'
format = xbstream
`
want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262"

tmp, err := os.MkdirTemp(t.TempDir(), "test")
assert.NoError(t, err)

f, err := os.Create(path.Join(tmp, xtrabackupInfoFile))
assert.NoError(t, err)
_, err = f.WriteString(input)
assert.NoError(t, err)
assert.NoError(t, f.Close())

pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger())
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}

func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `nothing`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
if err == nil {
t.Fatalf("expected error from findReplicationPosition but got nil")
}
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestFindReplicationPositionEmptyMatch(t *testing.T) {
func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `GTID of the last change '
'`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
if err == nil {
t.Fatalf("expected error from findReplicationPosition but got nil")
}
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestStripeRoundTrip(t *testing.T) {
Expand Down
Loading

0 comments on commit 13ba09d

Please sign in to comment.