diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 9874d368dcd..ea7abdf6ef6 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -439,7 +439,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri Name: shardName, } log.Infof("Starting shard: %v", shardName) - var mysqlctlProcessList []*exec.Cmd + var mysqlctlProcessList []*processInfo for i := 0; i < totalTabletsRequired; i++ { // instantiate vttablet object with reserved ports tabletUID := cluster.GetAndReserveTabletUID() @@ -514,7 +514,7 @@ func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName stri // wait till all mysqlctl is instantiated for _, proc := range mysqlctlProcessList { - if err := proc.Wait(); err != nil { + if err := proc.wait(); err != nil { log.Errorf("unable to start mysql process %v: %v", proc, err) return nil, err } @@ -560,13 +560,13 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard } // Create the keyspace if it doesn't already exist. _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy) - var mysqlctlProcessList []*exec.Cmd + var mysqlctlProcessList []*processInfo for _, shardName := range shardNames { shard := &Shard{ Name: shardName, } log.Infof("Starting shard: %v", shardName) - mysqlctlProcessList = []*exec.Cmd{} + mysqlctlProcessList = []*processInfo{} for i := 0; i < totalTabletsRequired; i++ { // instantiate vttablet object with reserved ports tabletUID := cluster.GetAndReserveTabletUID() @@ -629,7 +629,7 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard // wait till all mysqlctl is instantiated for _, proc := range mysqlctlProcessList { - if err = proc.Wait(); err != nil { + if err = proc.wait(); err != nil { log.Errorf("unable to start mysql process %v: %v", proc, err) return err } @@ -1061,7 +1061,7 @@ func (cluster *LocalProcessCluster) Teardown() { } } - var mysqlctlProcessList []*exec.Cmd + var mysqlctlProcessList []*processInfo var mysqlctlTabletUIDs []int for _, keyspace := range cluster.Keyspaces { for _, shard := range keyspace.Shards { @@ -1107,17 +1107,17 @@ func (cluster *LocalProcessCluster) Teardown() { cluster.teardownCompleted = true } -func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*exec.Cmd, mysqlctlTabletUIDs []int) { +func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessList []*processInfo, mysqlctlTabletUIDs []int) { wg := sync.WaitGroup{} for i, cmd := range mysqlctlProcessList { wg.Add(1) - go func(cmd *exec.Cmd, tabletUID int) { + go func(cmd *processInfo, tabletUID int) { defer func() { wg.Done() }() exit := make(chan error) go func() { - exit <- cmd.Wait() + exit <- cmd.wait() }() select { case <-time.After(30 * time.Second): diff --git a/go/test/endtoend/cluster/command.go b/go/test/endtoend/cluster/command.go new file mode 100644 index 00000000000..3c11749d8bd --- /dev/null +++ b/go/test/endtoend/cluster/command.go @@ -0,0 +1,85 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "bytes" + "fmt" + "os" + "os/exec" +) + +type processInfo struct { + proc *exec.Cmd + stdout bytes.Buffer + stderr bytes.Buffer +} + +func newCommand(name string, arg ...string) *processInfo { + pi := &processInfo{ + proc: exec.Command(name, arg...), + } + pi.proc.Stdout = &pi.stdout + pi.proc.Stderr = &pi.stderr + return pi +} + +func (pi *processInfo) addArgs(arg ...string) { + pi.proc.Args = append(pi.proc.Args, arg...) +} + +func (pi *processInfo) getArgs() []string { + return pi.proc.Args +} + +func (pi *processInfo) addEnv(arg ...string) { + pi.proc.Env = append(pi.proc.Env, arg...) +} + +func (pi *processInfo) failed(err error) error { + if err == nil { + return nil + } + message := "failed to run %v: \n%w" + if out := pi.stdout.String(); out != "" { + message += fmt.Sprintf("\nstdout: %s", out) + } + if out := pi.stderr.String(); out != "" { + message += fmt.Sprintf("\nstderr: %s", out) + } + + return fmt.Errorf(message, pi.proc.Args, err) +} + +func (pi *processInfo) start() error { + err := pi.proc.Start() + return pi.failed(err) +} + +func (pi *processInfo) wait() error { + err := pi.proc.Wait() + return pi.failed(err) +} + +func (pi *processInfo) process() *os.Process { + return pi.proc.Process +} + +func (pi *processInfo) run() error { + err := pi.proc.Run() + return pi.failed(err) +} diff --git a/go/test/endtoend/cluster/mysqlctl_process.go b/go/test/endtoend/cluster/mysqlctl_process.go index cfc4fc28088..895f3fb1b34 100644 --- a/go/test/endtoend/cluster/mysqlctl_process.go +++ b/go/test/endtoend/cluster/mysqlctl_process.go @@ -78,7 +78,7 @@ func (mysqlctl *MysqlctlProcess) Start() (err error) { if err != nil { return err } - return tmpProcess.Wait() + return tmpProcess.wait() } // StartProvideInit executes mysqlctl command to start mysql instance @@ -87,27 +87,27 @@ func (mysqlctl *MysqlctlProcess) StartProvideInit(init bool) (err error) { if err != nil { return err } - return tmpProcess.Wait() + return tmpProcess.wait() } // StartProcess starts the mysqlctl and returns the process reference -func (mysqlctl *MysqlctlProcess) StartProcess() (*exec.Cmd, error) { +func (mysqlctl *MysqlctlProcess) StartProcess() (*processInfo, error) { return mysqlctl.startProcess(true) } -func (mysqlctl *MysqlctlProcess) startProcess(init bool) (*exec.Cmd, error) { - tmpProcess := exec.Command( +func (mysqlctl *MysqlctlProcess) startProcess(init bool) (*processInfo, error) { + tmpProcess := newCommand( mysqlctl.Binary, "--log_dir", mysqlctl.LogDirectory, "--tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID), "--mysql_port", fmt.Sprintf("%d", mysqlctl.MySQLPort), ) if *isCoverage { - tmpProcess.Args = append(tmpProcess.Args, []string{"--test.coverprofile=" + getCoveragePath("mysql-start.out")}...) + tmpProcess.addArgs("--test.coverprofile=" + getCoveragePath("mysql-start.out")) } if len(mysqlctl.ExtraArgs) > 0 { - tmpProcess.Args = append(tmpProcess.Args, mysqlctl.ExtraArgs...) + tmpProcess.addArgs(mysqlctl.ExtraArgs...) } if mysqlctl.InitMysql { if mysqlctl.SecureTransport { @@ -145,27 +145,27 @@ ssl_key={{.ServerKey}} return nil, err } - tmpProcess.Env = append(tmpProcess.Env, "EXTRA_MY_CNF="+extraMyCNF) - tmpProcess.Env = append(tmpProcess.Env, "VTDATAROOT="+os.Getenv("VTDATAROOT")) + tmpProcess.addEnv("EXTRA_MY_CNF=" + extraMyCNF) + tmpProcess.addEnv("VTDATAROOT=" + os.Getenv("VTDATAROOT")) } if init { - tmpProcess.Args = append(tmpProcess.Args, "init") + tmpProcess.addArgs("init") if mysqlctl.MajorVersion < 18 { - tmpProcess.Args = append(tmpProcess.Args, "--") + tmpProcess.addArgs("--") } - tmpProcess.Args = append(tmpProcess.Args, "--init_db_sql_file", mysqlctl.InitDBFile) + tmpProcess.addArgs("--init_db_sql_file", mysqlctl.InitDBFile) } else { - tmpProcess.Args = append(tmpProcess.Args, "start") + tmpProcess.addArgs("start") } } else { - tmpProcess.Args = append(tmpProcess.Args, "start") + tmpProcess.addArgs("start") } - tmpProcess.Env = append(tmpProcess.Env, os.Environ()...) - tmpProcess.Env = append(tmpProcess.Env, DefaultVttestEnv) - log.Infof("Starting mysqlctl with command: %v", tmpProcess.Args) - return tmpProcess, tmpProcess.Start() + tmpProcess.addEnv(os.Environ()...) + tmpProcess.addEnv(DefaultVttestEnv) + log.Infof("Starting mysqlctl with command: %v", tmpProcess.getArgs()) + return tmpProcess, tmpProcess.start() } // Stop executes mysqlctl command to stop mysql instance and kills the mysql instance @@ -183,7 +183,7 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) { // To prevent this process for hanging for 5 minutes, we will add a 30-second timeout. exit := make(chan error) go func() { - exit <- tmpProcess.Wait() + exit <- tmpProcess.wait() }() select { case <-time.After(30 * time.Second): @@ -224,20 +224,20 @@ func (mysqlctl *MysqlctlProcess) Stop() (err error) { } // StopProcess executes mysqlctl command to stop mysql instance and returns process reference -func (mysqlctl *MysqlctlProcess) StopProcess() (*exec.Cmd, error) { - tmpProcess := exec.Command( +func (mysqlctl *MysqlctlProcess) StopProcess() (*processInfo, error) { + tmpProcess := newCommand( mysqlctl.Binary, "--log_dir", mysqlctl.LogDirectory, "--tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID), ) if *isCoverage { - tmpProcess.Args = append(tmpProcess.Args, []string{"--test.coverprofile=" + getCoveragePath("mysql-stop.out")}...) + tmpProcess.addArgs([]string{"--test.coverprofile=" + getCoveragePath("mysql-stop.out")}...) } if len(mysqlctl.ExtraArgs) > 0 { - tmpProcess.Args = append(tmpProcess.Args, mysqlctl.ExtraArgs...) + tmpProcess.addArgs(mysqlctl.ExtraArgs...) } - tmpProcess.Args = append(tmpProcess.Args, "shutdown") - return tmpProcess, tmpProcess.Start() + tmpProcess.addArgs("shutdown") + return tmpProcess, tmpProcess.start() } func (mysqlctl *MysqlctlProcess) BasePath() string { diff --git a/go/test/endtoend/cluster/mysqlctld_process.go b/go/test/endtoend/cluster/mysqlctld_process.go index 08409c1246d..105220cdcdd 100644 --- a/go/test/endtoend/cluster/mysqlctld_process.go +++ b/go/test/endtoend/cluster/mysqlctld_process.go @@ -41,7 +41,7 @@ type MysqlctldProcess struct { MySQLPort int InitDBFile string ExtraArgs []string - process *exec.Cmd + process *processInfo exit chan error InitMysql bool SocketFile string @@ -80,16 +80,15 @@ func (mysqlctld *MysqlctldProcess) Start() error { if mysqlctld.SocketFile != "" { args = append(args, "--socket_file", mysqlctld.SocketFile) } - tempProcess := exec.Command( + tempProcess := newCommand( mysqlctld.Binary, args..., ) - tempProcess.Args = append(tempProcess.Args, mysqlctld.ExtraArgs...) + tempProcess.addArgs(mysqlctld.ExtraArgs...) if mysqlctld.InitMysql { - tempProcess.Args = append(tempProcess.Args, - "--init_db_sql_file", mysqlctld.InitDBFile) + tempProcess.addArgs("--init_db_sql_file", mysqlctld.InitDBFile) } err := os.MkdirAll(mysqlctld.LogDirectory, 0755) @@ -97,21 +96,12 @@ func (mysqlctld *MysqlctldProcess) Start() error { log.Errorf("Failed to create directory for mysqlctld logs: %v", err) return err } - errFile, err := os.Create(path.Join(mysqlctld.LogDirectory, "mysqlctld-stderr.txt")) - if err != nil { - log.Errorf("Failed to create directory for mysqlctld stderr: %v", err) - } - tempProcess.Stderr = errFile - - tempProcess.Env = append(tempProcess.Env, os.Environ()...) - tempProcess.Env = append(tempProcess.Env, DefaultVttestEnv) - tempProcess.Stdout = os.Stdout - tempProcess.Stderr = os.Stderr - mysqlctld.ErrorLog = errFile.Name() - log.Infof("%v", strings.Join(tempProcess.Args, " ")) + tempProcess.addEnv(os.Environ()...) + tempProcess.addEnv(DefaultVttestEnv) + log.Infof("%v", strings.Join(tempProcess.getArgs(), " ")) - err = tempProcess.Start() + err = tempProcess.start() if err != nil { return err } @@ -120,15 +110,9 @@ func (mysqlctld *MysqlctldProcess) Start() error { mysqlctld.exit = make(chan error) go func(mysqlctld *MysqlctldProcess) { - err := mysqlctld.process.Wait() + err := mysqlctld.process.wait() if !mysqlctld.exitSignalReceived { - errBytes, ferr := os.ReadFile(mysqlctld.ErrorLog) - if ferr == nil { - log.Errorf("mysqlctld error log contents:\n%s", string(errBytes)) - } else { - log.Errorf("Failed to read the mysqlctld error log file %q: %v", mysqlctld.ErrorLog, ferr) - } - fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.Process.Pid) + fmt.Printf("mysqlctld stopped unexpectedly, tabletUID %v, mysql port %v, PID %v\n", mysqlctld.TabletUID, mysqlctld.MySQLPort, mysqlctld.process.process().Pid) } mysqlctld.process = nil mysqlctld.exitSignalReceived = false diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index d5d5c8482a0..5cad4285513 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -23,7 +23,6 @@ import ( "net" "net/http" "os" - "os/exec" "path" "strings" "syscall" @@ -58,7 +57,7 @@ type TopoProcess struct { Client interface{} Server *vtopo.Server - proc *exec.Cmd + proc *processInfo exit chan error } @@ -87,7 +86,7 @@ func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) // SetupEtcd spawns a new etcd service and initializes it with the defaults. // The service is kept running in the background until TearDown() is called. func (topo *TopoProcess) SetupEtcd() (err error) { - topo.proc = exec.Command( + topo.proc = newCommand( topo.Binary, "--name", topo.Name, "--data-dir", topo.DataDirectory, @@ -102,27 +101,20 @@ func (topo *TopoProcess) SetupEtcd() (err error) { if err != nil && !os.IsExist(err) { return err } - errFile, err := os.Create(path.Join(topo.DataDirectory, "topo-stderr.txt")) - if err != nil { - return err - } - topo.proc.Stderr = errFile - topo.ErrorLog = errFile.Name() + topo.proc.addEnv(os.Environ()...) + topo.proc.addEnv(DefaultVttestEnv) - topo.proc.Env = append(topo.proc.Env, os.Environ()...) - topo.proc.Env = append(topo.proc.Env, DefaultVttestEnv) + log.Infof("Starting etcd with command: %v", strings.Join(topo.proc.getArgs(), " ")) - log.Infof("Starting etcd with command: %v", strings.Join(topo.proc.Args, " ")) - - err = topo.proc.Start() + err = topo.proc.start() if err != nil { return } topo.exit = make(chan error) go func() { - topo.exit <- topo.proc.Wait() + topo.exit <- topo.proc.wait() close(topo.exit) }() @@ -166,7 +158,7 @@ func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) error { topo.ZKPorts = fmt.Sprintf("%d:%d:%d", cluster.GetAndReservePort(), cluster.GetAndReservePort(), topo.Port) - topo.proc = exec.Command( + topo.proc = newCommand( topo.Binary, "--log_dir", topo.LogDirectory, "--zk.cfg", fmt.Sprintf("1@%v:%s", host, topo.ZKPorts), @@ -178,16 +170,10 @@ func (topo *TopoProcess) SetupZookeeper(cluster *LocalProcessCluster) error { log.Errorf("Failed to create log directory for zookeeper: %v", err) return err } - errFile, err := os.Create(path.Join(topo.LogDirectory, "topo-stderr.txt")) - if err != nil { - log.Errorf("Failed to create file for zookeeper stderr: %v", err) - return err - } - topo.proc.Stderr = errFile - topo.proc.Env = append(topo.proc.Env, os.Environ()...) + topo.proc.addEnv(os.Environ()...) - log.Infof("Starting zookeeper with args %v", strings.Join(topo.proc.Args, " ")) - return topo.proc.Run() + log.Infof("Starting zookeeper with args %v", strings.Join(topo.proc.getArgs(), " ")) + return topo.proc.run() } // ConsulConfigs are the configurations that are added the config files which are used by consul @@ -254,7 +240,7 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { return } - topo.proc = exec.Command( + topo.proc = newCommand( topo.Binary, "agent", "-server", "-ui", @@ -263,24 +249,17 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { "-config-file", configFile, ) - errFile, err := os.Create(path.Join(topo.LogDirectory, "topo-stderr.txt")) - if err != nil { - log.Errorf("Failed to create file for consul stderr: %v", err) - return - } - topo.proc.Stderr = errFile - - topo.proc.Env = append(topo.proc.Env, os.Environ()...) + topo.proc.addEnv(os.Environ()...) - log.Errorf("Starting consul with args %v", strings.Join(topo.proc.Args, " ")) - err = topo.proc.Start() + log.Errorf("Starting consul with args %v", strings.Join(topo.proc.getArgs(), " ")) + err = topo.proc.start() if err != nil { return } topo.exit = make(chan error) go func() { - topo.exit <- topo.proc.Wait() + topo.exit <- topo.proc.wait() close(topo.exit) }() @@ -321,14 +300,14 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo if keepdata { cmd = "teardown" } - topo.proc = exec.Command( + topo.proc = newCommand( topo.Binary, "--log_dir", topo.LogDirectory, "--zk.cfg", fmt.Sprintf("1@%v:%s", topo.Host, topo.ZKPorts), cmd, ) - err := topo.proc.Run() + err := topo.proc.run() if err != nil { return err } @@ -342,7 +321,7 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo } // Attempt graceful shutdown with SIGTERM first - _ = topo.proc.Process.Signal(syscall.SIGTERM) + _ = topo.proc.process().Signal(syscall.SIGTERM) if !(*keepData || keepdata) { _ = os.RemoveAll(topo.DataDirectory) @@ -356,7 +335,7 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo return nil case <-time.After(10 * time.Second): - topo.proc.Process.Kill() + topo.proc.process().Kill() err := <-topo.exit topo.proc = nil return err diff --git a/go/test/endtoend/cluster/vtbackup_process.go b/go/test/endtoend/cluster/vtbackup_process.go index 57350922a21..91028b81338 100644 --- a/go/test/endtoend/cluster/vtbackup_process.go +++ b/go/test/endtoend/cluster/vtbackup_process.go @@ -19,7 +19,6 @@ package cluster import ( "fmt" "os" - "os/exec" "path" "strings" "syscall" @@ -48,13 +47,13 @@ type VtbackupProcess struct { initialBackup bool initDBfile string - proc *exec.Cmd + proc *processInfo exit chan error } // Setup starts vtbackup process with required arguements func (vtbackup *VtbackupProcess) Setup() (err error) { - vtbackup.proc = exec.Command( + vtbackup.proc = newCommand( vtbackup.Binary, "--topo_implementation", vtbackup.CommonArg.TopoImplementation, "--topo_global_server_address", vtbackup.CommonArg.TopoGlobalAddress, @@ -73,20 +72,17 @@ func (vtbackup *VtbackupProcess) Setup() (err error) { ) if vtbackup.initialBackup { - vtbackup.proc.Args = append(vtbackup.proc.Args, "--initial_backup") + vtbackup.proc.addArgs("--initial_backup") } if vtbackup.ExtraArgs != nil { - vtbackup.proc.Args = append(vtbackup.proc.Args, vtbackup.ExtraArgs...) + vtbackup.proc.addArgs(vtbackup.ExtraArgs...) } - vtbackup.proc.Stderr = os.Stderr - vtbackup.proc.Stdout = os.Stdout + vtbackup.proc.addEnv(os.Environ()...) + vtbackup.proc.addEnv(DefaultVttestEnv) + log.Infof("Running vtbackup with args: %v", strings.Join(vtbackup.proc.getArgs(), " ")) - vtbackup.proc.Env = append(vtbackup.proc.Env, os.Environ()...) - vtbackup.proc.Env = append(vtbackup.proc.Env, DefaultVttestEnv) - log.Infof("Running vtbackup with args: %v", strings.Join(vtbackup.proc.Args, " ")) - - err = vtbackup.proc.Run() + err = vtbackup.proc.run() if err != nil { return } @@ -94,7 +90,7 @@ func (vtbackup *VtbackupProcess) Setup() (err error) { vtbackup.exit = make(chan error) go func() { if vtbackup.proc != nil { - vtbackup.exit <- vtbackup.proc.Wait() + vtbackup.exit <- vtbackup.proc.wait() close(vtbackup.exit) } }() @@ -109,7 +105,7 @@ func (vtbackup *VtbackupProcess) TearDown() error { } // Attempt graceful shutdown with SIGTERM first - vtbackup.proc.Process.Signal(syscall.SIGTERM) + vtbackup.proc.process().Signal(syscall.SIGTERM) select { case err := <-vtbackup.exit: @@ -117,7 +113,7 @@ func (vtbackup *VtbackupProcess) TearDown() error { return err case <-time.After(10 * time.Second): - vtbackup.proc.Process.Kill() + vtbackup.proc.process().Kill() err := <-vtbackup.exit vtbackup.proc = nil return err diff --git a/go/test/endtoend/cluster/vtctld_process.go b/go/test/endtoend/cluster/vtctld_process.go index 6ac6ed5d2b0..b66cfcb26a3 100644 --- a/go/test/endtoend/cluster/vtctld_process.go +++ b/go/test/endtoend/cluster/vtctld_process.go @@ -20,7 +20,6 @@ import ( "fmt" "net/http" "os" - "os/exec" "path" "strings" "syscall" @@ -45,7 +44,7 @@ type VtctldProcess struct { VerifyURL string Directory string - proc *exec.Cmd + proc *processInfo exit chan error } @@ -53,7 +52,7 @@ type VtctldProcess struct { func (vtctld *VtctldProcess) Setup(cell string, extraArgs ...string) (err error) { _ = createDirectory(vtctld.LogDir, 0700) _ = createDirectory(path.Join(vtctld.Directory, "backups"), 0700) - vtctld.proc = exec.Command( + vtctld.proc = newCommand( vtctld.Binary, "--topo_implementation", vtctld.CommonArg.TopoImplementation, "--topo_global_server_address", vtctld.CommonArg.TopoGlobalAddress, @@ -70,36 +69,29 @@ func (vtctld *VtctldProcess) Setup(cell string, extraArgs ...string) (err error) ) if *isCoverage { - vtctld.proc.Args = append(vtctld.proc.Args, "--test.coverprofile="+getCoveragePath("vtctld.out")) + vtctld.proc.addArgs("--test.coverprofile=" + getCoveragePath("vtctld.out")) } - vtctld.proc.Args = append(vtctld.proc.Args, extraArgs...) + vtctld.proc.addArgs(extraArgs...) err = os.MkdirAll(vtctld.LogDir, 0755) if err != nil { log.Errorf("cannot create log directory for vtctld: %v", err) return err } - errFile, err := os.Create(path.Join(vtctld.LogDir, "vtctld-stderr.txt")) - if err != nil { - log.Errorf("cannot create error log file for vtctld: %v", err) - return err - } - vtctld.proc.Stderr = errFile - vtctld.ErrorLog = errFile.Name() - vtctld.proc.Env = append(vtctld.proc.Env, os.Environ()...) - vtctld.proc.Env = append(vtctld.proc.Env, DefaultVttestEnv) + vtctld.proc.addEnv(os.Environ()...) + vtctld.proc.addEnv(DefaultVttestEnv) - log.Infof("Starting vtctld with command: %v", strings.Join(vtctld.proc.Args, " ")) + log.Infof("Starting vtctld with command: %v", strings.Join(vtctld.proc.getArgs(), " ")) - err = vtctld.proc.Start() + err = vtctld.proc.start() if err != nil { return } vtctld.exit = make(chan error) go func() { - vtctld.exit <- vtctld.proc.Wait() + vtctld.exit <- vtctld.proc.wait() close(vtctld.exit) }() @@ -149,7 +141,7 @@ func (vtctld *VtctldProcess) TearDown() error { } // Attempt graceful shutdown with SIGTERM first - vtctld.proc.Process.Signal(syscall.SIGTERM) + vtctld.proc.process().Signal(syscall.SIGTERM) select { case err := <-vtctld.exit: @@ -157,7 +149,7 @@ func (vtctld *VtctldProcess) TearDown() error { return err case <-time.After(10 * time.Second): - vtctld.proc.Process.Kill() + vtctld.proc.process().Kill() err := <-vtctld.exit vtctld.proc = nil return err diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index c01f7c6e93b..f4bba493f2e 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "os" - "os/exec" "path" "reflect" "strconv" @@ -62,14 +61,14 @@ type VtgateProcess struct { // Extra Args to be set before starting the vtgate process ExtraArgs []string - proc *exec.Cmd + proc *processInfo exit chan error } const defaultVtGatePlannerVersion = planbuilder.Gen4 // Setup starts Vtgate process with required arguements -func (vtgate *VtgateProcess) Setup() (err error) { +func (vtgate *VtgateProcess) Setup() error { args := []string{ "--topo_implementation", vtgate.CommonArg.TopoImplementation, "--topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress, @@ -116,36 +115,28 @@ func (vtgate *VtgateProcess) Setup() (err error) { if vtgate.SysVarSetEnabled { args = append(args, "--enable_system_settings") } - vtgate.proc = exec.Command( + vtgate.proc = newCommand( vtgate.Binary, args..., ) if *isCoverage { - vtgate.proc.Args = append(vtgate.proc.Args, "--test.coverprofile="+getCoveragePath("vtgate.out")) + vtgate.proc.addArgs("--test.coverprofile=" + getCoveragePath("vtgate.out")) } - vtgate.proc.Args = append(vtgate.proc.Args, vtgate.ExtraArgs...) + vtgate.proc.addArgs(vtgate.ExtraArgs...) + vtgate.proc.addEnv(os.Environ()...) + vtgate.proc.addEnv(DefaultVttestEnv) - errFile, err := os.Create(path.Join(vtgate.LogDir, "vtgate-stderr.txt")) - if err != nil { - log.Errorf("cannot create error log file for vtgate: %v", err) - return err - } - vtgate.proc.Stderr = errFile + log.Infof("Running vtgate with command: %v", strings.Join(vtgate.proc.getArgs(), " ")) - vtgate.proc.Env = append(vtgate.proc.Env, os.Environ()...) - vtgate.proc.Env = append(vtgate.proc.Env, DefaultVttestEnv) - - log.Infof("Running vtgate with command: %v", strings.Join(vtgate.proc.Args, " ")) - - err = vtgate.proc.Start() + err := vtgate.proc.start() if err != nil { - return + return err } vtgate.exit = make(chan error) go func() { if vtgate.proc != nil { - vtgate.exit <- vtgate.proc.Wait() + vtgate.exit <- vtgate.proc.wait() close(vtgate.exit) } }() @@ -243,7 +234,7 @@ func (vtgate *VtgateProcess) Terminate() error { if vtgate.proc == nil { return nil } - return vtgate.proc.Process.Signal(syscall.SIGTERM) + return vtgate.proc.process().Signal(syscall.SIGTERM) } // TearDown shuts down the running vtgate service @@ -253,7 +244,7 @@ func (vtgate *VtgateProcess) TearDown() error { } // graceful shutdown is not currently working with vtgate, attempting a force-kill to make tests less flaky // Attempt graceful shutdown with SIGTERM first - vtgate.proc.Process.Signal(syscall.SIGTERM) + vtgate.proc.process().Signal(syscall.SIGTERM) // We are not checking vtgate's exit code because it sometimes // returns exit code 2, even though vtgate terminates cleanly. @@ -263,7 +254,7 @@ func (vtgate *VtgateProcess) TearDown() error { return nil case <-time.After(30 * time.Second): - vtgate.proc.Process.Kill() + vtgate.proc.process().Kill() err := <-vtgate.exit vtgate.proc = nil return err diff --git a/go/test/endtoend/cluster/vtorc_process.go b/go/test/endtoend/cluster/vtorc_process.go index 4fcb68e292d..632f86506bf 100644 --- a/go/test/endtoend/cluster/vtorc_process.go +++ b/go/test/endtoend/cluster/vtorc_process.go @@ -23,7 +23,6 @@ import ( "io" "net/http" "os" - "os/exec" "path" "strings" "syscall" @@ -37,15 +36,14 @@ import ( // vtorc as a separate process for testing type VTOrcProcess struct { VtctlProcess - Port int - LogDir string - LogFileName string - ExtraArgs []string - ConfigPath string - Config VTOrcConfiguration - WebPort int - proc *exec.Cmd - exit chan error + Port int + LogDir string + ExtraArgs []string + ConfigPath string + Config VTOrcConfiguration + WebPort int + proc *processInfo + exit chan error } type VTOrcConfiguration struct { @@ -106,7 +104,7 @@ func (orc *VTOrcProcess) Setup() (err error) { $ vtorc --topo_implementation etcd2 --topo_global_server_address localhost:2379 --topo_global_root /vitess/global --config config/vtorc/default.json --alsologtostderr */ - orc.proc = exec.Command( + orc.proc = newCommand( orc.Binary, "--topo_implementation", orc.TopoImplementation, "--topo_global_server_address", orc.TopoGlobalAddress, @@ -122,28 +120,18 @@ func (orc *VTOrcProcess) Setup() (err error) { ) if *isCoverage { - orc.proc.Args = append(orc.proc.Args, "--test.coverprofile="+getCoveragePath("orc.out")) + orc.proc.addArgs("--test.coverprofile=" + getCoveragePath("orc.out")) } - orc.proc.Args = append(orc.proc.Args, orc.ExtraArgs...) - orc.proc.Args = append(orc.proc.Args, "--alsologtostderr") + orc.proc.addArgs(orc.ExtraArgs...) + orc.proc.addArgs("--alsologtostderr") - if orc.LogFileName == "" { - orc.LogFileName = fmt.Sprintf("orc-stderr-%d.txt", timeNow) - } - errFile, err := os.Create(path.Join(orc.LogDir, orc.LogFileName)) - if err != nil { - log.Errorf("cannot create error log file for vtorc: %v", err) - return err - } - orc.proc.Stderr = errFile - - orc.proc.Env = append(orc.proc.Env, os.Environ()...) - orc.proc.Env = append(orc.proc.Env, DefaultVttestEnv) + orc.proc.addEnv(os.Environ()...) + orc.proc.addEnv(DefaultVttestEnv) - log.Infof("Running vtorc with command: %v", strings.Join(orc.proc.Args, " ")) + log.Infof("Running vtorc with command: %v", strings.Join(orc.proc.getArgs(), " ")) - err = orc.proc.Start() + err = orc.proc.start() if err != nil { return } @@ -151,7 +139,7 @@ func (orc *VTOrcProcess) Setup() (err error) { orc.exit = make(chan error) go func() { if orc.proc != nil { - orc.exit <- orc.proc.Wait() + orc.exit <- orc.proc.wait() close(orc.exit) } }() @@ -165,7 +153,7 @@ func (orc *VTOrcProcess) TearDown() error { return nil } // Attempt graceful shutdown with SIGTERM first - _ = orc.proc.Process.Signal(syscall.SIGTERM) + _ = orc.proc.process().Signal(syscall.SIGTERM) select { case <-orc.exit: @@ -173,7 +161,7 @@ func (orc *VTOrcProcess) TearDown() error { return nil case <-time.After(30 * time.Second): - _ = orc.proc.Process.Kill() + _ = orc.proc.process().Kill() err := <-orc.exit orc.proc = nil return err diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 6c7a85ec533..94cd66be6ca 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -26,7 +26,6 @@ import ( "io" "net/http" "os" - "os/exec" "path" "reflect" "strconv" @@ -85,13 +84,13 @@ type VttabletProcess struct { // Extra Args to be set before starting the vttablet process ExtraArgs []string - proc *exec.Cmd + proc *processInfo exit chan error } // Setup starts vttablet process with required arguements func (vttablet *VttabletProcess) Setup() (err error) { - vttablet.proc = exec.Command( + vttablet.proc = newCommand( vttablet.Binary, "--topo_implementation", vttablet.CommonArg.TopoImplementation, "--topo_global_server_address", vttablet.CommonArg.TopoGlobalAddress, @@ -116,31 +115,27 @@ func (vttablet *VttabletProcess) Setup() (err error) { ) if *isCoverage { - vttablet.proc.Args = append(vttablet.proc.Args, "--test.coverprofile="+getCoveragePath("vttablet.out")) + vttablet.proc.addArgs("--test.coverprofile=" + getCoveragePath("vttablet.out")) } if *PerfTest { - vttablet.proc.Args = append(vttablet.proc.Args, "--pprof", fmt.Sprintf("cpu,waitSig,path=vttablet_pprof_%s", vttablet.Name)) + vttablet.proc.addArgs("--pprof", fmt.Sprintf("cpu,waitSig,path=vttablet_pprof_%s", vttablet.Name)) } if vttablet.SupportsBackup { - vttablet.proc.Args = append(vttablet.proc.Args, "--restore_from_backup") + vttablet.proc.addArgs("--restore_from_backup") } if vttablet.DbFlavor != "" { - vttablet.proc.Args = append(vttablet.proc.Args, fmt.Sprintf("--db_flavor=%s", vttablet.DbFlavor)) + vttablet.proc.addArgs(fmt.Sprintf("--db_flavor=%s", vttablet.DbFlavor)) } - vttablet.proc.Args = append(vttablet.proc.Args, vttablet.ExtraArgs...) - fname := path.Join(vttablet.LogDir, vttablet.TabletPath+"-vttablet-stderr.txt") - errFile, _ := os.Create(fname) - vttablet.proc.Stderr = errFile - vttablet.ErrorLog = errFile.Name() + vttablet.proc.addArgs(vttablet.ExtraArgs...) - vttablet.proc.Env = append(vttablet.proc.Env, os.Environ()...) - vttablet.proc.Env = append(vttablet.proc.Env, DefaultVttestEnv) + vttablet.proc.addEnv(os.Environ()...) + vttablet.proc.addEnv(DefaultVttestEnv) - log.Infof("Running vttablet with command: %v", strings.Join(vttablet.proc.Args, " ")) + log.Infof("Running vttablet with command: %v", strings.Join(vttablet.proc.getArgs(), " ")) - err = vttablet.proc.Start() + err = vttablet.proc.start() if err != nil { return } @@ -150,7 +145,7 @@ func (vttablet *VttabletProcess) Setup() (err error) { vttablet.exit = make(chan error) go func() { if vttablet.proc != nil { - vttablet.exit <- vttablet.proc.Wait() + vttablet.exit <- vttablet.proc.wait() close(vttablet.exit) } }() @@ -165,10 +160,6 @@ func (vttablet *VttabletProcess) Setup() (err error) { servingStatus = append(servingStatus, "SERVING", "NOT_SERVING") } if err = vttablet.WaitForTabletStatuses(servingStatus); err != nil { - errFileContent, _ := os.ReadFile(fname) - if errFileContent != nil { - log.Infof("vttablet error:\n%s\n", string(errFileContent)) - } return fmt.Errorf("process '%s' timed out after 10s (err: %s)", vttablet.Name, err) } } @@ -414,7 +405,7 @@ func (vttablet *VttabletProcess) Kill() error { if vttablet.proc == nil || vttablet.exit == nil { return nil } - vttablet.proc.Process.Kill() + vttablet.proc.process().Kill() err := <-vttablet.exit vttablet.proc = nil return err @@ -427,7 +418,7 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro return nil } // Attempt graceful shutdown with SIGTERM first - vttablet.proc.Process.Signal(syscall.SIGTERM) + vttablet.proc.process().Signal(syscall.SIGTERM) select { case <-vttablet.exit: diff --git a/go/test/endtoend/cluster/vttablet_process_unix.go b/go/test/endtoend/cluster/vttablet_process_unix.go index 3f5c76e9988..f63aa66b1f0 100644 --- a/go/test/endtoend/cluster/vttablet_process_unix.go +++ b/go/test/endtoend/cluster/vttablet_process_unix.go @@ -22,5 +22,5 @@ import "syscall" // ToggleProfiling enables or disables the configured CPU profiler on this vttablet func (vttablet *VttabletProcess) ToggleProfiling() error { - return vttablet.proc.Process.Signal(syscall.SIGUSR1) + return vttablet.proc.process().Signal(syscall.SIGUSR1) } diff --git a/go/test/endtoend/vtgate/queries/derived/main_test.go b/go/test/endtoend/vtgate/queries/derived/main_test.go index 3b44811f95c..885a380f05b 100644 --- a/go/test/endtoend/vtgate/queries/derived/main_test.go +++ b/go/test/endtoend/vtgate/queries/derived/main_test.go @@ -54,6 +54,7 @@ func TestMain(m *testing.M) { // Start topo server err := clusterInstance.StartTopo() if err != nil { + fmt.Printf("Error starting topo server: %v\n", err.Error()) return 1 } @@ -65,6 +66,7 @@ func TestMain(m *testing.M) { } err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false) if err != nil { + fmt.Printf("Error starting keyspace: %v\n", err.Error()) return 1 } @@ -72,6 +74,7 @@ func TestMain(m *testing.M) { // Start vtgate err = clusterInstance.StartVtgate() if err != nil { + fmt.Printf("Error starting vtgate server: %v\n", err.Error()) return 1 } @@ -83,7 +86,7 @@ func TestMain(m *testing.M) { // create mysql instance and connection parameters conn, closer, err := utils.NewMySQL(clusterInstance, keyspaceName, schemaSQL) if err != nil { - fmt.Println(err) + fmt.Printf("Error starting mysql server: %v\n", err.Error()) return 1 } defer closer()