Skip to content

Commit

Permalink
Signed-off-by: Andres Taylor <[email protected]>
Browse files Browse the repository at this point in the history
test: make sure to report what goes wrong when cluster can't start

Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Oct 31, 2024
1 parent ffaeba0 commit c172984
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 214 deletions.
18 changes: 9 additions & 9 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand Down Expand Up @@ -514,7 +514,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err := proc.Wait(); err != nil {
if err := proc.wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return nil, err
}
Expand Down Expand Up @@ -560,13 +560,13 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
}
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
mysqlctlProcessList = []*exec.Cmd{}
mysqlctlProcessList = []*processInfo{}
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand Down Expand Up @@ -629,7 +629,7 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard

// wait till all mysqlctl is instantiated
for _, proc := range mysqlctlProcessList {
if err = proc.Wait(); err != nil {
if err = proc.wait(); err != nil {
log.Errorf("unable to start mysql process %v: %v", proc, err)
return err
}
Expand Down Expand Up @@ -1061,7 +1061,7 @@ func (cluster *LocalProcessCluster) Teardown() {
}
}

var mysqlctlProcessList []*exec.Cmd
var mysqlctlProcessList []*processInfo
var mysqlctlTabletUIDs []int
for _, keyspace := range cluster.Keyspaces {
for _, shard := range keyspace.Shards {
Expand Down Expand Up @@ -1107,17 +1107,17 @@ func (cluster *LocalProcessCluster) Teardown() {
cluster.teardownCompleted = true
}

func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*exec.Cmd, mysqlctlTabletUIDs []int) {
func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*processInfo, mysqlctlTabletUIDs []int) {
wg := sync.WaitGroup{}
for i, cmd := range mysqlctlProcessList {
wg.Add(1)
go func(cmd *exec.Cmd, tabletUID int) {
go func(cmd *processInfo, tabletUID int) {
defer func() {
wg.Done()
}()
exit := make(chan error)
go func() {
exit <- cmd.Wait()
exit <- cmd.wait()
}()
select {
case <-time.After(30 * time.Second):
Expand Down
85 changes: 85 additions & 0 deletions go/test/endtoend/cluster/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"bytes"
"fmt"
"os"
"os/exec"
)

type processInfo struct {
proc *exec.Cmd
stdout bytes.Buffer
stderr bytes.Buffer
}

func newCommand(name string, arg ...string) *processInfo {
pi := &processInfo{
proc: exec.Command(name, arg...),
}
pi.proc.Stdout = &pi.stdout
pi.proc.Stderr = &pi.stderr
return pi
}

func (pi *processInfo) addArgs(arg ...string) {
pi.proc.Args = append(pi.proc.Args, arg...)
}

func (pi *processInfo) getArgs() []string {
return pi.proc.Args
}

func (pi *processInfo) addEnv(arg ...string) {
pi.proc.Env = append(pi.proc.Env, arg...)
}

func (pi *processInfo) failed(err error) error {
if err == nil {
return nil
}
message := "failed to run %v: \n%w"
if out := pi.stdout.String(); out != "" {
message += fmt.Sprintf("\nstdout: %s", out)
}
if out := pi.stderr.String(); out != "" {
message += fmt.Sprintf("\nstderr: %s", out)
}

return fmt.Errorf(message, pi.proc.Args, err)
}

func (pi *processInfo) start() error {
err := pi.proc.Start()
return pi.failed(err)
}

func (pi *processInfo) wait() error {
err := pi.proc.Wait()
return pi.failed(err)
}

func (pi *processInfo) process() *os.Process {
return pi.proc.Process
}

func (pi *processInfo) run() error {
err := pi.proc.Run()
return pi.failed(err)
}
50 changes: 25 additions & 25 deletions go/test/endtoend/cluster/mysqlctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (mysqlctl *MysqlctlProcess) Start() (err error) {
if err != nil {
return err
}
return tmpProcess.Wait()
return tmpProcess.wait()
}

// StartProvideInit executes mysqlctl command to start mysql instance
Expand All @@ -87,27 +87,27 @@ func (mysqlctl *MysqlctlProcess) StartProvideInit(init bool) (err error) {
if err != nil {
return err
}
return tmpProcess.Wait()
return tmpProcess.wait()
}

// StartProcess starts the mysqlctl and returns the process reference
func (mysqlctl *MysqlctlProcess) StartProcess() (*exec.Cmd, error) {
func (mysqlctl *MysqlctlProcess) StartProcess() (*processInfo, error) {
return mysqlctl.startProcess(true)
}

func (mysqlctl *MysqlctlProcess) startProcess(init bool) (*exec.Cmd, error) {
tmpProcess := exec.Command(
func (mysqlctl *MysqlctlProcess) startProcess(init bool) (*processInfo, error) {
tmpProcess := newCommand(
mysqlctl.Binary,
"--log_dir", mysqlctl.LogDirectory,
"--tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID),
"--mysql_port", fmt.Sprintf("%d", mysqlctl.MySQLPort),
)
if *isCoverage {
tmpProcess.Args = append(tmpProcess.Args, []string{"--test.coverprofile=" + getCoveragePath("mysql-start.out")}...)
tmpProcess.addArgs("--test.coverprofile=" + getCoveragePath("mysql-start.out"))
}

if len(mysqlctl.ExtraArgs) > 0 {
tmpProcess.Args = append(tmpProcess.Args, mysqlctl.ExtraArgs...)
tmpProcess.addArgs(mysqlctl.ExtraArgs...)
}
if mysqlctl.InitMysql {
if mysqlctl.SecureTransport {
Expand Down Expand Up @@ -145,27 +145,27 @@ ssl_key={{.ServerKey}}
return nil, err
}

tmpProcess.Env = append(tmpProcess.Env, "EXTRA_MY_CNF="+extraMyCNF)
tmpProcess.Env = append(tmpProcess.Env, "VTDATAROOT="+os.Getenv("VTDATAROOT"))
tmpProcess.addEnv("EXTRA_MY_CNF=" + extraMyCNF)
tmpProcess.addEnv("VTDATAROOT=" + os.Getenv("VTDATAROOT"))
}

if init {
tmpProcess.Args = append(tmpProcess.Args, "init")
tmpProcess.addArgs("init")
if mysqlctl.MajorVersion < 18 {
tmpProcess.Args = append(tmpProcess.Args, "--")
tmpProcess.addArgs("--")
}

tmpProcess.Args = append(tmpProcess.Args, "--init_db_sql_file", mysqlctl.InitDBFile)
tmpProcess.addArgs("--init_db_sql_file", mysqlctl.InitDBFile)
} else {
tmpProcess.Args = append(tmpProcess.Args, "start")
tmpProcess.addArgs("start")
}
} else {
tmpProcess.Args = append(tmpProcess.Args, "start")
tmpProcess.addArgs("start")
}
tmpProcess.Env = append(tmpProcess.Env, os.Environ()...)
tmpProcess.Env = append(tmpProcess.Env, DefaultVttestEnv)
log.Infof("Starting mysqlctl with command: %v", tmpProcess.Args)
return tmpProcess, tmpProcess.Start()
tmpProcess.addEnv(os.Environ()...)
tmpProcess.addEnv(DefaultVttestEnv)
log.Infof("Starting mysqlctl with command: %v", tmpProcess.getArgs())
return tmpProcess, tmpProcess.start()
}

// Stop executes mysqlctl command to stop mysql instance and kills the mysql instance
Expand All @@ -183,7 +183,7 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) {
// To prevent this process for hanging for 5 minutes, we will add a 30-second timeout.
exit := make(chan error)
go func() {
exit <- tmpProcess.Wait()
exit <- tmpProcess.wait()
}()
select {
case <-time.After(30 * time.Second):
Expand Down Expand Up @@ -224,20 +224,20 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) {
}

// StopProcess executes mysqlctl command to stop mysql instance and returns process reference
func (mysqlctl *MysqlctlProcess) StopProcess() (*exec.Cmd, error) {
tmpProcess := exec.Command(
func (mysqlctl *MysqlctlProcess) StopProcess() (*processInfo, error) {
tmpProcess := newCommand(
mysqlctl.Binary,
"--log_dir", mysqlctl.LogDirectory,
"--tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID),
)
if *isCoverage {
tmpProcess.Args = append(tmpProcess.Args, []string{"--test.coverprofile=" + getCoveragePath("mysql-stop.out")}...)
tmpProcess.addArgs([]string{"--test.coverprofile=" + getCoveragePath("mysql-stop.out")}...)
}
if len(mysqlctl.ExtraArgs) > 0 {
tmpProcess.Args = append(tmpProcess.Args, mysqlctl.ExtraArgs...)
tmpProcess.addArgs(mysqlctl.ExtraArgs...)
}
tmpProcess.Args = append(tmpProcess.Args, "shutdown")
return tmpProcess, tmpProcess.Start()
tmpProcess.addArgs("shutdown")
return tmpProcess, tmpProcess.start()
}

func (mysqlctl *MysqlctlProcess) BasePath() string {
Expand Down
36 changes: 10 additions & 26 deletions go/test/endtoend/cluster/mysqlctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type MysqlctldProcess struct {
MySQLPort int
InitDBFile string
ExtraArgs []string
process *exec.Cmd
process *processInfo
exit chan error
InitMysql bool
SocketFile string
Expand Down Expand Up @@ -80,38 +80,28 @@ func (mysqlctld *MysqlctldProcess) Start() error {
if mysqlctld.SocketFile != "" {
args = append(args, "--socket_file", mysqlctld.SocketFile)
}
tempProcess := exec.Command(
tempProcess := newCommand(
mysqlctld.Binary,
args...,
)

tempProcess.Args = append(tempProcess.Args, mysqlctld.ExtraArgs...)
tempProcess.addArgs(mysqlctld.ExtraArgs...)

if mysqlctld.InitMysql {
tempProcess.Args = append(tempProcess.Args,
"--init_db_sql_file", mysqlctld.InitDBFile)
tempProcess.addArgs("--init_db_sql_file", mysqlctld.InitDBFile)
}

err := os.MkdirAll(mysqlctld.LogDirectory, 0755)
if err != nil {
log.Errorf("Failed to create directory for mysqlctld logs: %v", err)
return err
}
errFile, err := os.Create(path.Join(mysqlctld.LogDirectory, "mysqlctld-stderr.txt"))
if err != nil {
log.Errorf("Failed to create directory for mysqlctld stderr: %v", err)
}
tempProcess.Stderr = errFile

tempProcess.Env = append(tempProcess.Env, os.Environ()...)
tempProcess.Env = append(tempProcess.Env, DefaultVttestEnv)
tempProcess.Stdout = os.Stdout
tempProcess.Stderr = os.Stderr
mysqlctld.ErrorLog = errFile.Name()

log.Infof("%v", strings.Join(tempProcess.Args, " "))
tempProcess.addEnv(os.Environ()...)
tempProcess.addEnv(DefaultVttestEnv)
log.Infof("%v", strings.Join(tempProcess.getArgs(), " "))

err = tempProcess.Start()
err = tempProcess.start()
if err != nil {
return err
}
Expand All @@ -120,15 +110,9 @@ func (mysqlctld *MysqlctldProcess) Start() error {

mysqlctld.exit = make(chan error)
go func(mysqlctld *MysqlctldProcess) {
err := mysqlctld.process.Wait()
err := mysqlctld.process.wait()
if !mysqlctld.exitSignalReceived {
errBytes, ferr := os.ReadFile(mysqlctld.ErrorLog)
if ferr == nil {
log.Errorf("mysqlctld error log contents:\n%s", string(errBytes))
} else {
log.Errorf("Failed to read the mysqlctld error log file %q: %v", mysqlctld.ErrorLog, ferr)
}
fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.Process.Pid)
fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.process().Pid)
}
mysqlctld.process = nil
mysqlctld.exitSignalReceived = false
Expand Down
Loading

0 comments on commit c172984

Please sign in to comment.