Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Improve Error Reporting for Cluster Failures in End-to-End Tests #17125

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 40 additions & 40 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
cluster.VtctlProcess = *VtctlProcessInstance(cluster.TopoProcess.Port, cluster.Hostname)
if !cluster.ReusingVTDATAROOT {
if err = cluster.VtctlProcess.AddCellInfo(cluster.Cell); err != nil {
log.Error(err)
log.Error(err.Error())
return
}
cluster.VtctlProcess.LogDir = cluster.TmpDirectory
Expand Down Expand Up @@ -409,15 +409,15 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err.Error())
return
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err.Error())
return
}
}
Expand All @@ -426,7 +426,7 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames

err = cluster.StartVTOrc(keyspace.Name)
if err != nil {
log.Errorf("Error starting VTOrc - %v", err)
log.Errorf("Error starting VTOrc - %v", err.Error())
return err
}
}
Expand All @@ -439,7 +439,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand Down Expand Up @@ -471,7 +471,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err.Error())
return nil, err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)
Expand Down Expand Up @@ -514,23 +514,23 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err := proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
if err := proc.wait(); err != nil {
log.Errorf("unable to start mysql process\n%v\n%s", proc.proc.String(), err.Error())
return nil, err
}
}
for _, tablet := range shard.Vttablets {
log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

if err := tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %w", tablet.TabletUID, tablet.GrpcPort, err.Error())
return nil, err
}
}

// Make first tablet as primary
if err := cluster.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err)
log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err.Error())
return nil, err
}
return shard, nil
Expand Down Expand Up @@ -560,13 +560,13 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
}
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
mysqlctlProcessList = []*exec.Cmd{}
mysqlctlProcessList = []*processInfo{}
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand All @@ -592,7 +592,7 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
tablet.MysqlctlProcess = *mysqlctlProcess
proc, err := tablet.MysqlctlProcess.StartProcess()
if err != nil {
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err.Error())
return err
}
mysqlctlProcessList = append(mysqlctlProcessList, proc)
Expand Down Expand Up @@ -629,30 +629,30 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err = proc.Wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
if err = proc.wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err.Error())
return err
}
}
for _, tablet := range shard.Vttablets {
if !cluster.ReusingVTDATAROOT {
if _, err = tablet.VttabletProcess.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name), keyspace.Name, false); err != nil {
log.Errorf("error creating database for keyspace %v: %v", keyspace.Name, err)
log.Errorf("error creating database for keyspace %v: %v", keyspace.Name, err.Error())
return
}
}

log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)

if err = tablet.VttabletProcess.Setup(); err != nil {
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err.Error())
return
}
}

// Make first tablet as primary
if err = cluster.VtctldClientProcess.InitShardPrimary(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
log.Errorf("error running ISM on keyspace %v, shard %v: %v", keyspace.Name, shardName, err)
log.Errorf("error running ISM on keyspace %v, shard %v: %v", keyspace.Name, shardName, err.Error())
return
}
keyspace.Shards = append(keyspace.Shards, *shard)
Expand All @@ -672,15 +672,15 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
// Apply Schema SQL
if keyspace.SchemaSQL != "" {
if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err.Error())
return
}
}

// Apply VSchema
if keyspace.VSchema != "" {
if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err.Error())
return
}
}
Expand All @@ -704,7 +704,7 @@ func (cluster *LocalProcessCluster) SetupCluster(keyspace *Keyspace, shards []Sh
// Create Keyspace
err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
if err != nil {
log.Error(err)
log.Error(err.Error())
return
}
}
Expand Down Expand Up @@ -818,7 +818,7 @@ func NewCluster(cell string, hostname string) *LocalProcessCluster {

err := cluster.populateVersionInfo()
if err != nil {
log.Errorf("Error populating version information - %v", err)
log.Errorf("Error populating version information - %v", err.Error())
}
return cluster
}
Expand Down Expand Up @@ -853,12 +853,12 @@ func GetMajorVersion(binaryName string) (int, error) {
func (cluster *LocalProcessCluster) RestartVtgate() (err error) {
err = cluster.VtgateProcess.TearDown()
if err != nil {
log.Errorf("error stopping vtgate %v: %v", cluster.VtgateProcess, err)
log.Errorf("error stopping vtgate %v: %v", cluster.VtgateProcess, err.Error())
return
}
err = cluster.StartVtgate()
if err != nil {
log.Errorf("error starting vtgate %v: %v", cluster.VtgateProcess, err)
log.Errorf("error starting vtgate %v: %v", cluster.VtgateProcess, err.Error())
return
}
return err
Expand Down Expand Up @@ -930,7 +930,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet *
return nil, err
}

conn, err := tabletconn.GetDialer()(ctx, tablet, grpcclient.FailFast(false))
conn, err := tabletconn.GetDialer()(ctx, tablet, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1052,36 +1052,36 @@ func (cluster *LocalProcessCluster) Teardown() {
cluster.CancelFunc()
}
if err := cluster.VtgateProcess.TearDown(); err != nil {
log.Errorf("Error in vtgate teardown: %v", err)
log.Errorf("Error in vtgate teardown: %v", err.Error())
}

for _, vtorcProcess := range cluster.VTOrcProcesses {
if err := vtorcProcess.TearDown(); err != nil {
log.Errorf("Error in vtorc teardown: %v", err)
log.Errorf("Error in vtorc teardown: %v", err.Error())
}
}

var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
var mysqlctlTabletUIDs []int
for _, keyspace := range cluster.Keyspaces {
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Vttablets {
if tablet.MysqlctlProcess.TabletUID > 0 {
if proc, err := tablet.MysqlctlProcess.StopProcess(); err != nil {
log.Errorf("Error in mysqlctl teardown: %v", err)
log.Errorf("Error in mysqlctl teardown: %v", err.Error())
} else {
mysqlctlProcessList = append(mysqlctlProcessList, proc)
mysqlctlTabletUIDs = append(mysqlctlTabletUIDs, tablet.MysqlctlProcess.TabletUID)
}
}
if tablet.MysqlctldProcess.TabletUID > 0 {
if err := tablet.MysqlctldProcess.Stop(); err != nil {
log.Errorf("Error in mysqlctl teardown: %v", err)
log.Errorf("Error in mysqlctl teardown: %v", err.Error())
}
}

if err := tablet.VttabletProcess.TearDown(); err != nil {
log.Errorf("Error in vttablet teardown: %v", err)
log.Errorf("Error in vttablet teardown: %v", err.Error())
}
}
}
Expand All @@ -1094,11 +1094,11 @@ func (cluster *LocalProcessCluster) Teardown() {
cluster.waitForMySQLProcessToExit(mysqlctlProcessList, mysqlctlTabletUIDs)

if err := cluster.VtctldProcess.TearDown(); err != nil {
log.Errorf("Error in vtctld teardown: %v", err)
log.Errorf("Error in vtctld teardown: %v", err.Error())
}

if err := cluster.TopoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData, *topoFlavor); err != nil {
log.Errorf("Error in topo server teardown: %v", err)
log.Errorf("Error in topo server teardown: %v", err.Error())
}

// reset the VTDATAROOT path.
Expand All @@ -1107,17 +1107,17 @@ func (cluster *LocalProcessCluster) Teardown() {
cluster.teardownCompleted = true
}

func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*exec.Cmd, mysqlctlTabletUIDs []int) {
func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*processInfo, mysqlctlTabletUIDs []int) {
wg := sync.WaitGroup{}
for i, cmd := range mysqlctlProcessList {
wg.Add(1)
go func(cmd *exec.Cmd, tabletUID int) {
go func(cmd *processInfo, tabletUID int) {
defer func() {
wg.Done()
}()
exit := make(chan error)
go func() {
exit <- cmd.Wait()
exit <- cmd.wait()
}()
select {
case <-time.After(30 * time.Second):
Expand All @@ -1126,7 +1126,7 @@ func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessLis
if err == nil {
return
}
log.Errorf("Error in mysqlctl teardown wait: %v", err)
log.Errorf("Error in mysqlctl teardown wait: %v", err.Error())
break
}
pidFile := path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.pid", tabletUID))
Expand All @@ -1138,12 +1138,12 @@ func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessLis
}
pid, err := strconv.Atoi(strings.TrimSpace(string(pidBytes)))
if err != nil {
log.Errorf("Error in conversion to integer: %v", err)
log.Errorf("Error in conversion to integer: %v", err.Error())
return
}
err = syscallutil.Kill(pid, syscall.SIGKILL)
if err != nil {
log.Errorf("Error in killing process: %v", err)
log.Errorf("Error in killing process: %v", err.Error())
}
}(cmd, mysqlctlTabletUIDs[i])
}
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
ln, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(cluster.nextPortForProcess)))

if err != nil {
log.Errorf("Can't listen on port %v: %s, trying next port", cluster.nextPortForProcess, err)
log.Errorf("Can't listen on port %v: %s, trying next port", cluster.nextPortForProcess, err.Error())
continue
}

Expand Down
85 changes: 85 additions & 0 deletions go/test/endtoend/cluster/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"bytes"
"fmt"
"os"
"os/exec"
)

type processInfo struct {
proc *exec.Cmd
stdout bytes.Buffer
stderr bytes.Buffer
}

func newCommand(name string, arg ...string) *processInfo {
pi := &processInfo{
proc: exec.Command(name, arg...),
}
pi.proc.Stdout = &pi.stdout
pi.proc.Stderr = &pi.stderr
return pi
}

func (pi *processInfo) addArgs(arg ...string) {
pi.proc.Args = append(pi.proc.Args, arg...)
}

func (pi *processInfo) getArgs() []string {
return pi.proc.Args
}

func (pi *processInfo) addEnv(arg ...string) {
pi.proc.Env = append(pi.proc.Env, arg...)
}

func (pi *processInfo) failed(err error) error {
if err == nil {
return nil
}
message := "failed to run %v: \n%w"
if out := pi.stdout.String(); out != "" {
message += fmt.Sprintf("\nstdout: %s", out)
}
if out := pi.stderr.String(); out != "" {
message += fmt.Sprintf("\nstderr: %s", out)
}

return fmt.Errorf(message, pi.proc.Args, err)
}

func (pi *processInfo) start() error {
err := pi.proc.Start()
return pi.failed(err)
}

func (pi *processInfo) wait() error {
err := pi.proc.Wait()
return pi.failed(err)
}

func (pi *processInfo) process() *os.Process {
return pi.proc.Process
}

func (pi *processInfo) run() error {
err := pi.proc.Run()
return pi.failed(err)
}
Loading
Loading