Skip to content

Commit

Permalink
Making Reshard work smoothly with Atomic Transactions (#16844)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 3, 2024
1 parent 2e47aba commit 24212e7
Show file tree
Hide file tree
Showing 19 changed files with 648 additions and 143 deletions.
236 changes: 122 additions & 114 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,103 +378,9 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
tablet := &Vttablet{
TabletUID: tabletUID,
Type: "replica",
HTTPPort: cluster.GetAndReservePort(),
GrpcPort: cluster.GetAndReservePort(),
MySQLPort: cluster.GetAndReservePort(),
Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
}
if i == 0 { // Make the first one as primary
tablet.Type = "primary"
} else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
tablet.Type = "rdonly"
}
// Start Mysqlctl process
log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
if err != nil {
return err
}
switch tablet.Type {
case "primary":
mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
return err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)

// start vttablet process
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
cluster.Cell,
shardName,
keyspace.Name,
cluster.VtctldProcess.Port,
tablet.Type,
cluster.TopoProcess.Port,
cluster.Hostname,
cluster.TmpDirectory,
cluster.VtTabletExtraArgs,
cluster.DefaultCharset)
switch tablet.Type {
case "primary":
tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.Alias = tablet.VttabletProcess.TabletPath
if cluster.ReusingVTDATAROOT {
tablet.VttabletProcess.ServingStatus = "SERVING"
}
shard.Vttablets = append(shard.Vttablets, tablet)
// Apply customizations
for _, customizer := range customizers {
if f, ok := customizer.(func(*VttabletProcess)); ok {
f(tablet.VttabletProcess)
} else {
return fmt.Errorf("type mismatch on customizer: %T", customizer)
}
}
}

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err = proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return err
}
}
for _, tablet := range shard.Vttablets {
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

if err = tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
return
}
}

// Make first tablet as primary
if err = cluster.VtctldClientProcess.InitializeShard(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspace.Name, shardName, err)
return
shard, err := cluster.AddShard(keyspace.Name, shardName, totalTabletsRequired, rdonly, customizers)
if err != nil {
return err
}
keyspace.Shards = append(keyspace.Shards, *shard)
}
Expand All @@ -488,33 +394,135 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
}
if !existingKeyspace {
cluster.Keyspaces = append(cluster.Keyspaces, keyspace)
}

// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
return
// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
return
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
return
}
}

log.Infof("Done creating keyspace: %v ", keyspace.Name)

err = cluster.StartVTOrc(keyspace.Name)
if err != nil {
log.Errorf("Error starting VTOrc - %v", err)
return err
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
return
return
}

func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName string, totalTabletsRequired int, rdonly bool, customizers []any) (*Shard, error) {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
tablet := &Vttablet{
TabletUID: tabletUID,
Type: "replica",
HTTPPort: cluster.GetAndReservePort(),
GrpcPort: cluster.GetAndReservePort(),
MySQLPort: cluster.GetAndReservePort(),
Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
}
if i == 0 { // Make the first one as primary
tablet.Type = "primary"
} else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
tablet.Type = "rdonly"
}
// Start Mysqlctl process
log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
if err != nil {
return nil, err
}
switch tablet.Type {
case "primary":
mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
return nil, err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)

// start vttablet process
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
cluster.Cell,
shardName,
keyspaceName,
cluster.VtctldProcess.Port,
tablet.Type,
cluster.TopoProcess.Port,
cluster.Hostname,
cluster.TmpDirectory,
cluster.VtTabletExtraArgs,
cluster.DefaultCharset)
switch tablet.Type {
case "primary":
tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
case "replica":
tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
}
tablet.Alias = tablet.VttabletProcess.TabletPath
if cluster.ReusingVTDATAROOT {
tablet.VttabletProcess.ServingStatus = "SERVING"
}
shard.Vttablets = append(shard.Vttablets, tablet)
// Apply customizations
for _, customizer := range customizers {
if f, ok := customizer.(func(*VttabletProcess)); ok {
f(tablet.VttabletProcess)
} else {
return nil, fmt.Errorf("type mismatch on customizer: %T", customizer)
}
}
}

log.Infof("Done creating keyspace: %v ", keyspace.Name)
// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err := proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return nil, err
}
}
for _, tablet := range shard.Vttablets {
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

err = cluster.StartVTOrc(keyspace.Name)
if err != nil {
log.Errorf("Error starting VTOrc - %v", err)
return err
if err := tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
return nil, err
}
}

return
// Make first tablet as primary
if err := cluster.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err)
return nil, err
}
return shard, nil
}

// StartUnshardedKeyspaceLegacy starts unshared keyspace with shard name as "0"
Expand Down
102 changes: 102 additions & 0 deletions go/test/endtoend/cluster/reshard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"fmt"
"slices"
"strings"
"testing"
"time"
)

// ReshardWorkflow is used to store the information needed to run
// Reshard commands.
type ReshardWorkflow struct {
t *testing.T
clusterInstance *LocalProcessCluster
workflowName string
targetKs string
sourceShards string
targetShards string
}

// NewReshard creates a new ReshardWorkflow.
func NewReshard(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, targetShards, srcShards string) *ReshardWorkflow {
return &ReshardWorkflow{
t: t,
clusterInstance: clusterInstance,
workflowName: workflowName,
targetKs: targetKs,
sourceShards: srcShards,
targetShards: targetShards,
}
}

func (rw *ReshardWorkflow) Create() (string, error) {
args := []string{"Create"}
if rw.sourceShards != "" {
args = append(args, "--source-shards="+rw.sourceShards)
}
if rw.targetShards != "" {
args = append(args, "--target-shards="+rw.targetShards)
}

return rw.exec(args...)
}

func (rw *ReshardWorkflow) exec(args ...string) (string, error) {
args2 := []string{"Reshard", "--workflow=" + rw.workflowName, "--target-keyspace=" + rw.targetKs}
args2 = append(args2, args...)
return rw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...)
}

func (rw *ReshardWorkflow) SwitchReadsAndWrites() (string, error) {
return rw.exec("SwitchTraffic")
}

func (rw *ReshardWorkflow) ReverseReadsAndWrites() (string, error) {
return rw.exec("ReverseTraffic")
}

func (rw *ReshardWorkflow) Cancel() (string, error) {
return rw.exec("Cancel")
}

func (rw *ReshardWorkflow) Complete() (string, error) {
return rw.exec("Complete")
}

func (rw *ReshardWorkflow) Show() (string, error) {
return rw.exec("Show")
}

func (rw *ReshardWorkflow) WaitForVreplCatchup(timeToWait time.Duration) {
targetShards := strings.Split(rw.targetShards, ",")
for _, ks := range rw.clusterInstance.Keyspaces {
if ks.Name != rw.targetKs {
continue
}
for _, shard := range ks.Shards {
if !slices.Contains(targetShards, shard.Name) {
continue
}
vttablet := shard.PrimaryTablet().VttabletProcess
vttablet.WaitForVReplicationToCatchup(rw.t, rw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait)
}
}
}
Loading

0 comments on commit 24212e7

Please sign in to comment.