diff --git a/Makefile b/Makefile index f5ca261..65a48ec 100644 --- a/Makefile +++ b/Makefile @@ -2,20 +2,10 @@ default: build all: build -build: agent chaos verifier - -agent: - go build -o bin/chaos-agent cmd/agent/main.go +build: chaos verifier chaos: go build -o bin/chaos-tidb cmd/tidb/main.go verifier: - go build -o bin/chaos-verifier cmd/verifier/main.go - -update: - which glide >/dev/null || curl https://glide.sh/get | sh - which glide-vc || go get -v -u github.com/sgotti/glide-vc - glide update --strip-vendor --skip-test - @echo "removing test files" - glide vc --only-code --no-tests \ No newline at end of file + go build -o bin/chaos-verifier cmd/verifier/main.go \ No newline at end of file diff --git a/README.md b/README.md index 5a138bf..086167b 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,19 @@ Chaos is inspired by [jepsen](https://github.com/jepsen-io/jepsen) and uses [por ## Architecture -Chaos runs your registered database on 5 nodes, and starts an agent in every node too. The agent will -receive the command sent from the controller to control the service, like starting/stoping the service, -or using a nemesis to disturb the whole cluster. +Chaos runs your registered database on 5 nodes, sends the command through `ssh` to control the service, like starting/stoping the service, or using a nemesis to disturb the whole cluster. -![Architecture](./chaos.jpg) +``` + +-------------+ + +------- | controller | -------+ + | +-------------+ | + | | | | | + | +----+ | | | + v v | | v ++----+----+----+ | | +----+----+ +| n1 | n2 | n3 | <+ +> | n4 | n5 | ++----+----+----+ +----+----+ +``` ## Usage @@ -27,10 +35,6 @@ In another shell, use `docker exec -it chaos-control bash` to enter the controll # build the node and your own chaos test make -# deploy and start node agent -./scripts/deploy_agent.sh -./scripts/start_agent.sh - # run you own chaos like ./bin/chaos-tidb ``` diff --git a/chaos.jpg b/chaos.jpg deleted file mode 100644 index 245d46d..0000000 Binary files a/chaos.jpg and /dev/null differ diff --git a/cmd/agent/main.go b/cmd/agent/main.go deleted file mode 100644 index 69e56b8..0000000 --- a/cmd/agent/main.go +++ /dev/null @@ -1,52 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "os" - "os/signal" - "syscall" - - "github.com/siddontang/chaos/pkg/node" - - // register nemesis - _ "github.com/siddontang/chaos/pkg/nemesis" - - // register tidb - _ "github.com/siddontang/chaos/db/tidb" -) - -var ( - nodeAddr = flag.String("addr", ":8080", "node address") - logFile = flag.String("log-file", "/root/node.log", "node log file") -) - -func main() { - flag.Parse() - - f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) - if err != nil { - fmt.Printf("open log file failed %v", err) - os.Exit(1) - } - defer f.Close() - - log.SetOutput(f) - - log.Printf("begin to listen %s", *nodeAddr) - agent := node.NewAgent(*nodeAddr) - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func() { - agent.Run() - }() - - <-sigs - log.Printf("closing node") - - agent.Close() - - log.Printf("node is closed") -} diff --git a/cmd/tidb/main.go b/cmd/tidb/main.go index 7964763..77d9a35 100644 --- a/cmd/tidb/main.go +++ b/cmd/tidb/main.go @@ -18,13 +18,12 @@ import ( ) var ( - nodePort = flag.Int("node-port", 8080, "node port") requestCount = flag.Int("request-count", 500, "client test request count") runTime = flag.Duration("run-time", 10*time.Minute, "client test run time") clientCase = flag.String("case", "bank", "client test case, like bank") historyFile = flag.String("history", "./history.log", "history file") nemesises = flag.String("nemesis", "", "nemesis, seperated by name, like random_kill,all_kill") - verifyNames = flag.String("verifiers", "", "verifier names, seperate by comma, tidb-bank,tidb-bank-tso") + verifyNames = flag.String("verifiers", "", "verifier names, seperate by comma, tidb_bank,tidb_bank_tso") ) func main() { @@ -32,7 +31,6 @@ func main() { cfg := &control.Config{ DB: "tidb", - NodePort: *nodePort, RequestCount: *requestCount, RunTime: *runTime, History: *historyFile, diff --git a/cmd/verifier/verify/util.go b/cmd/verifier/verify/util.go index 00f78a8..477515e 100644 --- a/cmd/verifier/verify/util.go +++ b/cmd/verifier/verify/util.go @@ -17,10 +17,12 @@ func Verify(ctx context.Context, historyFile string, verfier_names string) { for _, name := range strings.Split(verfier_names, ",") { var verifier history.Verifier switch name { - case "tidb-bank": + case "tidb_bank": verifier = tidb.BankVerifier{} - case "tidb-bank-tso": + case "tidb_bank_tso": verifier = tidb.BankTsoVerifier{} + case "": + continue default: log.Printf("%s is not supported", name) continue diff --git a/db/tidb/db.go b/db/tidb/db.go index d12c784..ab9ec8e 100644 --- a/db/tidb/db.go +++ b/db/tidb/db.go @@ -3,16 +3,15 @@ package tidb import ( "context" "fmt" - "io/ioutil" "log" - "os" - "os/exec" "path" + "strconv" "strings" "time" "github.com/siddontang/chaos/pkg/core" "github.com/siddontang/chaos/pkg/util" + "github.com/siddontang/chaos/pkg/util/ssh" ) const ( @@ -41,20 +40,21 @@ type db struct { // SetUp initializes the database. func (db *db) SetUp(ctx context.Context, nodes []string, node string) error { // Try kill all old servers - exec.CommandContext(ctx, "killall", "-9", "tidb-server").Run() - exec.CommandContext(ctx, "killall", "-9", "tikv-server").Run() - exec.CommandContext(ctx, "killall", "-9", "pd-server").Run() + ssh.Exec(ctx, node, "killall", "-9", "tidb-server") + ssh.Exec(ctx, node, "killall", "-9", "tikv-server") + ssh.Exec(ctx, node, "killall", "-9", "pd-server") db.nodes = nodes - if err := util.InstallArchive(ctx, archiveURL, deployDir); err != nil { + log.Printf("install archieve on node %s", node) + if err := util.InstallArchive(ctx, node, archiveURL, deployDir); err != nil { return err } - os.MkdirAll(path.Join(deployDir, "conf"), 0755) - os.MkdirAll(path.Join(deployDir, "log"), 0755) + util.Mkdir(ctx, node, path.Join(deployDir, "conf")) + util.Mkdir(ctx, node, path.Join(deployDir, "log")) - if err := ioutil.WriteFile(pdConfig, []byte("[replication]\nmax-replicas=5"), 0644); err != nil { + if err := util.WriteFile(ctx, node, pdConfig, strconv.Quote("[replication]\nmax-replicas=5")); err != nil { return err } @@ -68,7 +68,7 @@ func (db *db) SetUp(ctx context.Context, nodes []string, node string) error { "raft_election_timeout_ticks=10", } - if err := ioutil.WriteFile(tikvConfig, []byte(strings.Join(tikvCfs, "\n")), 0644); err != nil { + if err := util.WriteFile(ctx, node, tikvConfig, strconv.Quote(strings.Join(tikvCfs, "\n"))); err != nil { return err } @@ -86,6 +86,8 @@ func (db *db) Start(ctx context.Context, node string) error { } func (db *db) start(ctx context.Context, node string, inSetUp bool) error { + log.Printf("start database on node %s", node) + initialClusterArgs := make([]string, len(db.nodes)) for i, n := range db.nodes { initialClusterArgs[i] = fmt.Sprintf("%s=http://%s:2380", n, n) @@ -105,7 +107,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { log.Printf("start pd-server on node %s", node) pdPID := path.Join(deployDir, "pd.pid") opts := util.NewDaemonOptions(deployDir, pdPID) - if err := util.StartDaemon(ctx, opts, pdBinary, pdArgs...); err != nil { + if err := util.StartDaemon(ctx, node, opts, pdBinary, pdArgs...); err != nil { return err } @@ -113,7 +115,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { time.Sleep(5 * time.Second) } - if !util.IsDaemonRunning(ctx, pdBinary, pdPID) { + if !util.IsDaemonRunning(ctx, node, pdBinary, pdPID) { return fmt.Errorf("fail to start pd on node %s", node) } @@ -134,7 +136,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { log.Printf("start tikv-server on node %s", node) tikvPID := path.Join(deployDir, "tikv.pid") opts = util.NewDaemonOptions(deployDir, tikvPID) - if err := util.StartDaemon(ctx, opts, tikvBinary, tikvArgs...); err != nil { + if err := util.StartDaemon(ctx, node, opts, tikvBinary, tikvArgs...); err != nil { return err } @@ -142,7 +144,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { time.Sleep(30 * time.Second) } - if !util.IsDaemonRunning(ctx, tikvBinary, tikvPID) { + if !util.IsDaemonRunning(ctx, node, tikvBinary, tikvPID) { return fmt.Errorf("fail to start tikv on node %s", node) } @@ -155,7 +157,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { log.Printf("start tidb-erver on node %s", node) tidbPID := path.Join(deployDir, "tidb.pid") opts = util.NewDaemonOptions(deployDir, tidbPID) - if err := util.StartDaemon(ctx, opts, tidbBinary, tidbArgs...); err != nil { + if err := util.StartDaemon(ctx, node, opts, tidbBinary, tidbArgs...); err != nil { return err } @@ -163,7 +165,7 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { time.Sleep(30 * time.Second) } - if !util.IsDaemonRunning(ctx, tidbBinary, tidbPID) { + if !util.IsDaemonRunning(ctx, node, tidbBinary, tidbPID) { return fmt.Errorf("fail to start tidb on node %s", node) } @@ -172,41 +174,33 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { // Stop stops the database func (db *db) Stop(ctx context.Context, node string) error { - if err := util.StopDaemon(ctx, tidbBinary, path.Join(deployDir, "tidb.pid")); err != nil { + if err := util.StopDaemon(ctx, node, tidbBinary, path.Join(deployDir, "tidb.pid")); err != nil { return err } - if err := util.StopDaemon(ctx, tikvBinary, path.Join(deployDir, "tikv.pid")); err != nil { + if err := util.StopDaemon(ctx, node, tikvBinary, path.Join(deployDir, "tikv.pid")); err != nil { return err } - if err := util.StopDaemon(ctx, pdBinary, path.Join(deployDir, "pd.pid")); err != nil { - return err - } - - return nil + return util.StopDaemon(ctx, node, pdBinary, path.Join(deployDir, "pd.pid")) } // Kill kills the database func (db *db) Kill(ctx context.Context, node string) error { - if err := util.KillDaemon(ctx, tidbBinary, path.Join(deployDir, "tidb.pid")); err != nil { + if err := util.KillDaemon(ctx, node, tidbBinary, path.Join(deployDir, "tidb.pid")); err != nil { return err } - if err := util.KillDaemon(ctx, tikvBinary, path.Join(deployDir, "tikv.pid")); err != nil { + if err := util.KillDaemon(ctx, node, tikvBinary, path.Join(deployDir, "tikv.pid")); err != nil { return err } - if err := util.KillDaemon(ctx, pdBinary, path.Join(deployDir, "pd.pid")); err != nil { - return err - } - - return nil + return util.KillDaemon(ctx, node, pdBinary, path.Join(deployDir, "pd.pid")) } // IsRunning checks whether the database is running or not func (db *db) IsRunning(ctx context.Context, node string) bool { - return util.IsDaemonRunning(ctx, tidbBinary, path.Join(deployDir, "tidb.pid")) + return util.IsDaemonRunning(ctx, node, tidbBinary, path.Join(deployDir, "tidb.pid")) } // Name returns the unique name for the database diff --git a/docker/control/bashrc b/docker/control/bashrc index 489f92e..9befed9 100644 --- a/docker/control/bashrc +++ b/docker/control/bashrc @@ -8,8 +8,6 @@ You are currently in the base dir of the git repo for Chaos. To run a test: make - ./script/deploy_agent.sh - ./script/start_agent.sh ./bin/chaos-tidb EOF diff --git a/pkg/control/config.go b/pkg/control/config.go index 4843bf8..d20ca76 100644 --- a/pkg/control/config.go +++ b/pkg/control/config.go @@ -6,9 +6,7 @@ import ( // Config is the configuration for the controller. type Config struct { - // NodePort is used to communicate with the node server. - NodePort int - // DB is the name which we want to run, you must register the db in the node before. + // DB is the name which we want to run. DB string // RequestCount controls how many requests a client sends to the db RequestCount int @@ -16,7 +14,7 @@ type Config struct { RunTime time.Duration // History file - History string + History string } func (c *Config) adjust() { diff --git a/pkg/control/control.go b/pkg/control/control.go index 7885493..c9f07e0 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -6,10 +6,16 @@ import ( "log" "sync" "sync/atomic" + "time" "github.com/siddontang/chaos/pkg/core" "github.com/siddontang/chaos/pkg/history" - "github.com/siddontang/chaos/pkg/node" + + // register nemesis + _ "github.com/siddontang/chaos/pkg/nemesis" + + // register tidb + _ "github.com/siddontang/chaos/db/tidb" ) // Controller controls the whole cluster. It sends request to the database, @@ -18,8 +24,7 @@ import ( type Controller struct { cfg *Config - nodes []string - nodeClients []*node.Client + nodes []string clients []core.Client @@ -41,6 +46,10 @@ func NewController(cfg *Config, clientCreator core.ClientCreator, nemesisGenerat log.Fatalf("empty database") } + if db := core.GetDB(cfg.DB); db == nil { + log.Fatalf("database %s is not registered", cfg.DB) + } + r, err := history.NewRecorder(cfg.History) if err != nil { log.Fatalf("prepare history failed %v", err) @@ -55,8 +64,6 @@ func NewController(cfg *Config, clientCreator core.ClientCreator, nemesisGenerat for i := 1; i <= 5; i++ { name := fmt.Sprintf("n%d", i) c.nodes = append(c.nodes, name) - client := node.NewClient(name, fmt.Sprintf("%s:%d", name, cfg.NodePort)) - c.nodeClients = append(c.nodeClients, client) c.clients = append(c.clients, clientCreator.Create(name)) } @@ -118,9 +125,10 @@ func (c *Controller) syncExec(f func(i int)) { func (c *Controller) setUpDB() { log.Printf("begin to set up database") c.syncExec(func(i int) { - client := c.nodeClients[i] log.Printf("begin to set up database on %s", c.nodes[i]) - if err := client.SetUpDB(c.cfg.DB, c.nodes); err != nil { + db := core.GetDB(c.cfg.DB) + err := db.SetUp(c.ctx, c.nodes, c.nodes[i]) + if err != nil { log.Fatalf("setup db %s at node %s failed %v", c.cfg.DB, c.nodes[i], err) } }) @@ -129,9 +137,9 @@ func (c *Controller) setUpDB() { func (c *Controller) tearDownDB() { log.Printf("begin to tear down database") c.syncExec(func(i int) { - client := c.nodeClients[i] log.Printf("being to tear down database on %s", c.nodes[i]) - if err := client.TearDownDB(c.cfg.DB, c.nodes); err != nil { + db := core.GetDB(c.cfg.DB) + if err := db.TearDown(c.ctx, c.nodes, c.nodes[i]); err != nil { log.Printf("tear down db %s at node %s failed %v", c.cfg.DB, c.nodes[i], err) } }) @@ -228,11 +236,24 @@ func (c *Controller) onNemesisLoop(ctx context.Context, index int, op *core.Neme return } - nodeClient := c.nodeClients[index] + nemesis := core.GetNemesis(op.Name) + if nemesis == nil { + log.Printf("nemesis %s is not registered", op.Name) + return + } + node := c.nodes[index] log.Printf("run nemesis %s on %s", op.Name, node) - if err := nodeClient.RunNemesis(op); err != nil { + if err := nemesis.Invoke(ctx, node, op.InvokeArgs...); err != nil { + log.Printf("run nemesis %s on %s failed: %v", op.Name, node, err) + } + + select { + case <-time.After(op.RunTime): + case <-ctx.Done(): + } + if err := nemesis.Recover(ctx, node, op.RecoverArgs...); err != nil { log.Printf("run nemesis %s on %s failed: %v", op.Name, node, err) } } diff --git a/pkg/control/control_test.go b/pkg/control/control_test.go index 1b25c55..6044722 100644 --- a/pkg/control/control_test.go +++ b/pkg/control/control_test.go @@ -1,6 +1,7 @@ package control import ( + "os" "testing" "time" @@ -11,12 +12,14 @@ func TestControl(t *testing.T) { t.Log("test can only be run in the chaos docker") cfg := &Config{ - NodePort: 8080, RequestCount: 10, RunTime: 10 * time.Second, DB: "noop", + History: "/tmp/chaos/a.log", } + defer os.Remove("/tmp/chaos/a.log") + c := NewController(cfg, core.NoopClientCreator{}, []core.NemesisGenerator{ core.NoopNemesisGenerator{}, }) diff --git a/pkg/core/db.go b/pkg/core/db.go index 06ecbc3..e7a80ff 100644 --- a/pkg/core/db.go +++ b/pkg/core/db.go @@ -6,7 +6,6 @@ import ( ) // DB allows Chaos to set up and tear down database. -// DB is used in node, you should define your own DB and register it. type DB interface { // SetUp initializes the database. SetUp(ctx context.Context, nodes []string, node string) error diff --git a/pkg/core/nemesis.go b/pkg/core/nemesis.go index 46bcb41..ecb3669 100644 --- a/pkg/core/nemesis.go +++ b/pkg/core/nemesis.go @@ -7,7 +7,6 @@ import ( ) // Nemesis injects failure and disturbs the database. -// Nemesis is used in node, you can define your own nemesis and register it. type Nemesis interface { // // SetUp initializes the nemesis // SetUp(ctx context.Context, node string) error diff --git a/pkg/history/history.go b/pkg/history/history.go index 14a2f35..6ca9881 100644 --- a/pkg/history/history.go +++ b/pkg/history/history.go @@ -5,6 +5,7 @@ import ( "encoding/json" "log" "os" + "path" "sync" "github.com/anishathalye/porcupine" @@ -30,6 +31,8 @@ type Recorder struct { // NewRecorder creates a recorder to log the history to the file. func NewRecorder(name string) (*Recorder, error) { + os.MkdirAll(path.Dir(name), 0755) + f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return nil, err diff --git a/pkg/nemesis/nemesis.go b/pkg/nemesis/nemesis.go index 59cfd2b..0cd4012 100644 --- a/pkg/nemesis/nemesis.go +++ b/pkg/nemesis/nemesis.go @@ -34,7 +34,7 @@ func (n drop) Invoke(ctx context.Context, node string, args ...string) error { continue } - if err := n.t.Drop(ctx, dropNode); err != nil { + if err := n.t.Drop(ctx, node, dropNode); err != nil { return err } } @@ -42,7 +42,7 @@ func (n drop) Invoke(ctx context.Context, node string, args ...string) error { } func (n drop) Recover(ctx context.Context, node string, args ...string) error { - return n.t.Heal(ctx) + return n.t.Heal(ctx, node) } func (drop) Name() string { diff --git a/pkg/node/agent.go b/pkg/node/agent.go deleted file mode 100644 index 071b787..0000000 --- a/pkg/node/agent.go +++ /dev/null @@ -1,88 +0,0 @@ -package node - -import ( - "context" - "net/http" - "sync" - "time" - - "github.com/gorilla/mux" - "github.com/unrolled/render" - "github.com/urfave/negroni" -) - -const apiPrefix = "/agent" - -// Agent is the agent to let the controller to control the node. -type Agent struct { - addr string - s *http.Server - ctx context.Context - cancel context.CancelFunc - - dbLock sync.Mutex - nemesisLock sync.Mutex -} - -// NewAgent creates the agent with given address -func NewAgent(addr string) *Agent { - agent := &Agent{ - addr: addr, - } - - agent.ctx, agent.cancel = context.WithCancel(context.Background()) - return agent -} - -// Run runs the agent API server -func (agent *Agent) Run() error { - agent.s = &http.Server{ - Addr: agent.addr, - Handler: agent.createHandler(), - } - return agent.s.ListenAndServe() -} - -// Close closes the agent. -func (agent *Agent) Close() { - agent.cancel() - if agent.s != nil { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - agent.s.Shutdown(ctx) - cancel() - } -} - -func (agent *Agent) createHandler() http.Handler { - engine := negroni.New() - recover := negroni.NewRecovery() - engine.Use(recover) - - router := mux.NewRouter() - subRouter := agent.createRouter() - router.PathPrefix(apiPrefix).Handler( - negroni.New(negroni.Wrap(subRouter)), - ) - - engine.UseHandler(router) - return engine -} - -func (agent *Agent) createRouter() *mux.Router { - rd := render.New(render.Options{ - IndentJSON: true, - }) - - router := mux.NewRouter().PathPrefix(apiPrefix).Subrouter() - - nemesisHandler := newNemesisHandler(agent, rd) - // router.HandleFunc("/nemesis/{name}/setup", nemesisHandler.SetUp).Methods("POST") - // router.HandleFunc("/nemesis/{name}/teardown", nemesisHandler.TearDown).Methods("POST") - router.HandleFunc("/nemesis/{name}/run", nemesisHandler.Run).Methods("POST") - - dbHandler := newDBHanlder(agent, rd) - router.HandleFunc("/db/{name}/setup", dbHandler.SetUp).Methods("POST") - router.HandleFunc("/db/{name}/teardown", dbHandler.TearDown).Methods("POST") - - return router -} diff --git a/pkg/node/client.go b/pkg/node/client.go deleted file mode 100644 index 44f9d6b..0000000 --- a/pkg/node/client.go +++ /dev/null @@ -1,116 +0,0 @@ -package node - -import ( - "bytes" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "strings" - - "github.com/siddontang/chaos/pkg/core" -) - -// Client is used to communicate with the agent server on the node. -type Client struct { - node string - addr string - client *http.Client -} - -// NewClient creates the client. -func NewClient(nodeName string, addr string) *Client { - return &Client{ - node: nodeName, - addr: addr, - client: http.DefaultClient, - } -} - -// Node returns the node name. -func (c *Client) Node() string { - return c.node -} - -func (c *Client) getURLPrefix() string { - return fmt.Sprintf("http://%s%s", c.addr, apiPrefix) -} - -func (c *Client) doPost(suffix string, args url.Values, data []byte) error { - if args == nil { - args = url.Values{} - } - args.Set("node", c.node) - url := fmt.Sprintf("%s%s?%s", c.getURLPrefix(), suffix, args.Encode()) - resp, err := c.client.Post(url, "application/json", bytes.NewBuffer(data)) - if err != nil { - return err - } - - defer resp.Body.Close() - data, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - if resp.StatusCode%200 != 0 { - return fmt.Errorf("%s:%s", resp.Status, data) - } - - return nil -} - -// SetUpDB is to set up the db -func (c *Client) SetUpDB(name string, nodes []string) error { - v := url.Values{} - v.Set("nodes", strings.Join(nodes, ",")) - - return c.doPost(fmt.Sprintf("/db/%s/setup", name), v, nil) -} - -// TearDownDB tears down db -func (c *Client) TearDownDB(name string, nodes []string) error { - v := url.Values{} - v.Set("nodes", strings.Join(nodes, ",")) - - return c.doPost(fmt.Sprintf("/db/%s/teardown", name), v, nil) -} - -// StartDB starts db -func (c *Client) StartDB(name string) error { - return c.doPost(fmt.Sprintf("/db/%s/start", name), nil, nil) -} - -// StopDB stops db -func (c *Client) StopDB(name string) error { - return c.doPost(fmt.Sprintf("/db/%s/stop", name), nil, nil) -} - -// KillDB kills db -func (c *Client) KillDB(name string) error { - return c.doPost(fmt.Sprintf("/db/%s/kill", name), nil, nil) -} - -// IsDBRunning checks db is running -func (c *Client) IsDBRunning(name string) bool { - return c.doPost(fmt.Sprintf("/db/%s/is_running", name), nil, nil) == nil -} - -// RunNemesis runs nemesis -func (c *Client) RunNemesis(op *core.NemesisOperation) error { - v := url.Values{} - suffix := fmt.Sprintf("/nemesis/%s/run", op.Name) - if op.RunTime > 0 { - v.Set("dur", op.RunTime.String()) - } - - if len(op.InvokeArgs) > 0 { - v.Set("invoke_args", strings.Join(op.InvokeArgs, ",")) - } - - if len(op.RecoverArgs) > 0 { - v.Set("recover_args", strings.Join(op.RecoverArgs, ",")) - } - - return c.doPost(suffix, v, nil) -} diff --git a/pkg/node/db.go b/pkg/node/db.go deleted file mode 100644 index b290ba8..0000000 --- a/pkg/node/db.go +++ /dev/null @@ -1,79 +0,0 @@ -package node - -import ( - "fmt" - "log" - "net/http" - "strings" - - "github.com/gorilla/mux" - "github.com/siddontang/chaos/pkg/core" - "github.com/unrolled/render" -) - -type dbHandler struct { - agent *Agent - rd *render.Render -} - -func newDBHanlder(agent *Agent, rd *render.Render) *dbHandler { - return &dbHandler{ - agent: agent, - rd: rd, - } -} - -func (h *dbHandler) getDB(w http.ResponseWriter, vars map[string]string) core.DB { - name := vars["name"] - db := core.GetDB(name) - if db == nil { - h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("db %s is not registered", name)) - return nil - } - return db -} - -func (h *dbHandler) SetUp(w http.ResponseWriter, r *http.Request) { - h.agent.dbLock.Lock() - defer h.agent.dbLock.Unlock() - - vars := mux.Vars(r) - db := h.getDB(w, vars) - if db == nil { - return - } - node := r.FormValue("node") - nodes := strings.Split(r.FormValue("nodes"), ",") - - log.Printf("set up db %s on node %s", db.Name(), node) - if err := db.SetUp(h.agent.ctx, nodes, node); err != nil { - log.Panicf("set up db %s failed %v", db.Name(), err) - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - h.rd.JSON(w, http.StatusOK, nil) -} - -func (h *dbHandler) TearDown(w http.ResponseWriter, r *http.Request) { - h.agent.dbLock.Lock() - defer h.agent.dbLock.Unlock() - - vars := mux.Vars(r) - db := h.getDB(w, vars) - if db == nil { - return - } - - node := r.FormValue("node") - nodes := strings.Split(r.FormValue("nodes"), ",") - - log.Printf("tear down db %s on node %s", db.Name(), node) - if err := db.TearDown(h.agent.ctx, nodes, node); err != nil { - log.Panicf("tear down db %s failed %v", db.Name(), err) - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - h.rd.JSON(w, http.StatusOK, nil) -} diff --git a/pkg/node/nemesis.go b/pkg/node/nemesis.go deleted file mode 100644 index 3beaf1e..0000000 --- a/pkg/node/nemesis.go +++ /dev/null @@ -1,74 +0,0 @@ -package node - -import ( - "fmt" - "log" - "math/rand" - "net/http" - "strings" - "time" - - "github.com/gorilla/mux" - "github.com/siddontang/chaos/pkg/core" - "github.com/unrolled/render" -) - -type nemesisHandler struct { - agent *Agent - rd *render.Render -} - -func newNemesisHandler(agent *Agent, rd *render.Render) *nemesisHandler { - return &nemesisHandler{ - agent: agent, - rd: rd, - } -} - -func (h *nemesisHandler) getNemesis(w http.ResponseWriter, vars map[string]string) core.Nemesis { - name := vars["name"] - nemesis := core.GetNemesis(name) - if nemesis == nil { - h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("nemesis %s is not registered", name)) - return nil - } - return nemesis -} - -func (h *nemesisHandler) Run(w http.ResponseWriter, r *http.Request) { - h.agent.nemesisLock.Lock() - defer h.agent.nemesisLock.Unlock() - - vars := mux.Vars(r) - nemesis := h.getNemesis(w, vars) - if nemesis == nil { - return - } - - node := r.FormValue("node") - invokeArgs := strings.Split(r.FormValue("invoke_args"), ",") - recoverArgs := strings.Split(r.FormValue("recover_args"), ",") - runTime, _ := time.ParseDuration(r.FormValue("dur")) - if runTime == 0 { - runTime = time.Second * time.Duration(rand.Intn(10)+1) - } - - log.Printf("invoke nemesis %s with %v on node %s", nemesis.Name(), invokeArgs, node) - - defer func() { - log.Printf("recover nemesis %s with %v on node %s", nemesis.Name(), recoverArgs, node) - nemesis.Recover(h.agent.ctx, node, recoverArgs...) - }() - - if err := nemesis.Invoke(h.agent.ctx, node, invokeArgs...); err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - select { - case <-h.agent.ctx.Done(): - case <-time.After(runTime): - } - - h.rd.JSON(w, http.StatusOK, nil) -} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go deleted file mode 100644 index 9d0877e..0000000 --- a/pkg/node/node_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package node - -import ( - "net" - "testing" - "time" - - "github.com/siddontang/chaos/pkg/core" -) - -func getNodeAddr(t *testing.T) string { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatalf("listen failed %v", err) - } - addr := l.Addr().String() - l.Close() - - return addr -} -func TestNodeHandler(t *testing.T) { - addr := getNodeAddr(t) - agent := NewAgent(addr) - defer agent.Close() - - client := NewClient("n0", addr) - - go func() { - agent.Run() - }() - - time.Sleep(time.Second) - - nodes := []string{"n0"} - if err := client.SetUpDB("noop", nodes); err != nil { - t.Fatalf("setup db failed %v", err) - } - - if err := client.StartDB("noop"); err != nil { - t.Fatalf("start db failed %v", err) - } - - if !client.IsDBRunning("noop") { - t.Fatalf("db must be running") - } - - if err := client.StopDB("noop"); err != nil { - t.Fatalf("stop db failed %v", err) - } - - if err := client.KillDB("noop"); err != nil { - t.Fatalf("kill db failed %v", err) - } - - if err := client.TearDownDB("noop", nodes); err != nil { - t.Fatalf("tear down db failed %v", err) - } - - if err := client.RunNemesis(&core.NemesisOperation{ - Name: "noop", - InvokeArgs: nil, - RecoverArgs: nil, - RunTime: 0, - }); err != nil { - t.Fatalf("start nemesis failed %v", err) - } -} diff --git a/pkg/util/net/iptables.go b/pkg/util/net/iptables.go index 5712a24..a8d42f8 100644 --- a/pkg/util/net/iptables.go +++ b/pkg/util/net/iptables.go @@ -3,9 +3,10 @@ package net import ( "context" "fmt" - "os/exec" "strings" "time" + + "github.com/siddontang/chaos/pkg/util/ssh" ) // IPTables implements Net interface to simulate the network. @@ -14,35 +15,35 @@ type IPTables struct { } // Drop drops traffic from node. -func (IPTables) Drop(ctx context.Context, node string) error { - return exec.CommandContext(ctx, "iptables", "-A", "INPUT", "-s", HostIP(node), "-j", "DROP", "-w").Run() +func (IPTables) Drop(ctx context.Context, node string, srcNode string) error { + return ssh.Exec(ctx, node, "iptables", "-A", "INPUT", "-s", HostIP(srcNode), "-j", "DROP", "-w") } // Heal ends all traffic drops and restores network to fast operations. -func (IPTables) Heal(ctx context.Context) error { - if err := exec.CommandContext(ctx, "iptables", "-F", "-w").Run(); err != nil { +func (IPTables) Heal(ctx context.Context, node string) error { + if err := ssh.Exec(ctx, node, "iptables", "-F", "-w"); err != nil { return err } - return exec.CommandContext(ctx, "iptables", "-X", "-w").Run() + return ssh.Exec(ctx, node, "iptables", "-X", "-w") } // Slow delays the network packets with opetions. -func (IPTables) Slow(ctx context.Context, opts SlowOptions) error { +func (IPTables) Slow(ctx context.Context, node string, opts SlowOptions) error { mean := fmt.Sprintf("%dms", opts.Mean.Nanoseconds()/int64(time.Millisecond)) variance := fmt.Sprintf("%dms", opts.Variance.Nanoseconds()/int64(time.Millisecond)) - return exec.CommandContext(ctx, "/sbin/tc", "qdisc", "add", "dev", "eth0", "root", "netem", "delay", - mean, variance, "distribution", opts.Distribution).Run() + return ssh.Exec(ctx, node, "/sbin/tc", "qdisc", "add", "dev", "eth0", "root", "netem", "delay", + mean, variance, "distribution", opts.Distribution) } // Flaky introduces randomized packet loss. -func (IPTables) Flaky(ctx context.Context) error { - return exec.CommandContext(ctx, "/sbin/tc", "qdisc", "add", "dev", "eth0", "root", "netem", "loss", - "20%", "75%").Run() +func (IPTables) Flaky(ctx context.Context, node string) error { + return ssh.Exec(ctx, node, "/sbin/tc", "qdisc", "add", "dev", "eth0", "root", "netem", "loss", + "20%", "75%") } // Fast removes packet loss and delays. -func (IPTables) Fast(ctx context.Context) error { - output, err := exec.CommandContext(ctx, "/sbin/tc", "qdisc", "del", "dev", "eth0", "root").CombinedOutput() +func (IPTables) Fast(ctx context.Context, node string) error { + output, err := ssh.CombinedOutput(ctx, node, "/sbin/tc", "qdisc", "del", "dev", "eth0", "root") if err != nil && strings.Contains(string(output), "RTNETLINK answers: No such file or directory") { err = nil } diff --git a/pkg/util/net/iptables_test.go b/pkg/util/net/iptables_test.go index fa79edd..c4740b4 100644 --- a/pkg/util/net/iptables_test.go +++ b/pkg/util/net/iptables_test.go @@ -12,31 +12,31 @@ func TestIPTables(t *testing.T) { ctx := context.Background() // should apt-get install iptables manually. - if err := iptables.Drop(ctx, "n1"); err != nil { + if err := iptables.Drop(ctx, "n1", "n2"); err != nil { t.Fatalf("drop network failed %v", err) } - if err := iptables.Heal(ctx); err != nil { + if err := iptables.Heal(ctx, "n1"); err != nil { t.Fatalf("heal netwrok failed %v", err) } - if err := iptables.Fast(ctx); err != nil { + if err := iptables.Fast(ctx, "n1"); err != nil { t.Fatalf("fast netwrok failed %v", err) } - if err := iptables.Slow(ctx, DefaultSlowOptions()); err != nil { + if err := iptables.Slow(ctx, "n1", DefaultSlowOptions()); err != nil { t.Fatalf("slow netwrok failed %v", err) } - if err := iptables.Fast(ctx); err != nil { + if err := iptables.Fast(ctx, "n1"); err != nil { t.Fatalf("fast netwrok failed %v", err) } - if err := iptables.Flaky(ctx); err != nil { + if err := iptables.Flaky(ctx, "n1"); err != nil { t.Fatalf("flaky netwrok failed %v", err) } - if err := iptables.Fast(ctx); err != nil { + if err := iptables.Fast(ctx, "n1"); err != nil { t.Fatalf("fast netwrok failed %v", err) } } diff --git a/pkg/util/net/net.go b/pkg/util/net/net.go index 5cadfb9..25c026a 100644 --- a/pkg/util/net/net.go +++ b/pkg/util/net/net.go @@ -23,16 +23,16 @@ func DefaultSlowOptions() SlowOptions { // Net is used to control the network. type Net interface { - // Drop drops traffic from node. - Drop(ctx context.Context, node string) error - // Heal ends all traffic drops and restores network to fast operations. - Heal(ctx context.Context) error - // Slow delays the network packets with options. - Slow(ctx context.Context, opts SlowOptions) error - // Flaky introduces randomized packet loss. - Flaky(ctx context.Context) error - // Fast removes packet loss and delays. - Fast(ctx context.Context) error + // Drop runs on the node and drops traffic from srcNode. + Drop(ctx context.Context, node string, srcNode string) error + // Heal runs on the node and ends all traffic drops and restores network to fast operations. + Heal(ctx context.Context, node string) error + // Slow runs on the node and delays the network packets with options. + Slow(ctx context.Context, node string, opts SlowOptions) error + // Flaky runs on the node and introduces randomized packet loss. + Flaky(ctx context.Context, node string) error + // Fast runs on the node and removes packet loss and delays. + Fast(ctx context.Context, node string) error } // Noop implements Net interface but does nothing. @@ -41,16 +41,16 @@ type Noop struct { } // Drop drops traffic from node. -func (Noop) Drop(ctx context.Context, node string) error { return nil } +func (Noop) Drop(ctx context.Context, node string, srcNode string) error { return nil } // Heal ends all traffic drops and restores network to fast operations. -func (Noop) Heal(ctx context.Context) error { return nil } +func (Noop) Heal(ctx context.Context, node string) error { return nil } // Slow delays the network packets with opetions. -func (Noop) Slow(ctx context.Context, opts SlowOptions) error { return nil } +func (Noop) Slow(ctx context.Context, node string, opts SlowOptions) error { return nil } // Flaky introduces randomized packet loss. -func (Noop) Flaky(ctx context.Context) error { return nil } +func (Noop) Flaky(ctx context.Context, node string) error { return nil } // Fast removes packet loss and delays. -func (Noop) Fast(ctx context.Context) error { return nil } +func (Noop) Fast(ctx context.Context, node string) error { return nil } diff --git a/pkg/util/ssh/ssh.go b/pkg/util/ssh/ssh.go index 29a1d31..071a8d4 100644 --- a/pkg/util/ssh/ssh.go +++ b/pkg/util/ssh/ssh.go @@ -3,14 +3,32 @@ package ssh import ( "context" "fmt" + "log" "os/exec" ) // Exec executes the cmd on the remote node. // Here we assume we can run with `ssh node cmd` directly // TODO: add a SSH config? -func Exec(ctx context.Context, node string, cmd string) error { - return exec.CommandContext(ctx, "ssh", node, cmd).Run() +func Exec(ctx context.Context, node string, cmd string, args ...string) error { + _, err := CombinedOutput(ctx, node, cmd, args...) + return err +} + +// CombinedOutput executes the cmd on the remote node and returns its combined standard +// output and standard error. +func CombinedOutput(ctx context.Context, node string, cmd string, args ...string) ([]byte, error) { + v := []string{ + node, + cmd, + } + v = append(v, args...) + data, err := exec.CommandContext(ctx, "ssh", v...).CombinedOutput() + if err != nil { + // For debug + log.Printf("%v %q %v", v, data, err) + } + return data, err } // Upload uploads files from local path to remote node path. diff --git a/pkg/util/util.go b/pkg/util/util.go index dd0b29a..7c5acb5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,34 +2,31 @@ package util import ( "context" - "io/ioutil" + "fmt" "net/url" - "os" - "os/exec" "path" "path/filepath" "strings" - "syscall" + "time" + + "github.com/siddontang/chaos/pkg/util/ssh" ) -// IsFileExist returns true if the file exists. -func IsFileExist(name string) bool { - _, err := os.Stat(name) +// IsFileExist runs on node and returns true if the file exists. +func IsFileExist(ctx context.Context, node string, name string) bool { + err := ssh.Exec(ctx, node, "stat", name) return err == nil } -// IsProcessExist returns true if the porcess still exists. -func IsProcessExist(pid int) bool { - p, err := os.FindProcess(pid) - if err != nil { - return false - } - return p.Signal(syscall.Signal(0)) == nil +// IsProcessExist runs on node and returns true if the porcess still exists. +func IsProcessExist(ctx context.Context, node string, pid int) bool { + err := ssh.Exec(ctx, node, "kill", fmt.Sprintf("-s 0 %d", pid)) + return err == nil } -// Wget downloads a string URL to the dest directory and returns the file path. +// Wget runs on node, downloads a string URL to the dest directory and returns the file path. // SKips if the file already exists. -func Wget(ctx context.Context, rawURL string, dest string) (string, error) { +func Wget(ctx context.Context, node string, rawURL string, dest string) (string, error) { u, err := url.Parse(rawURL) if err != nil { return "", err @@ -41,45 +38,42 @@ func Wget(ctx context.Context, rawURL string, dest string) (string, error) { fileName := path.Base(u.Path) filePath := path.Join(dest, fileName) - if IsFileExist(filePath) { + if IsFileExist(ctx, node, filePath) { return filePath, nil } - os.MkdirAll(dest, 0755) - err = exec.CommandContext(ctx, "wget", "--tries", "20", "--waitretry", "60", + Mkdir(ctx, node, dest) + err = ssh.Exec(ctx, node, "wget", "--tries", "20", "--waitretry", "60", "--retry-connrefused", "--dns-timeout", "60", "--connect-timeout", "60", - "--read-timeout", "60", "--directory-prefix", dest, rawURL).Run() + "--read-timeout", "60", "--directory-prefix", dest, rawURL) return filePath, err } -// InstallArchive downloads the URL and extracts the archive to the dest diretory. +// InstallArchive runs on node, downloads the URL and extracts the archive to the dest diretory. // Supports zip, and tarball. -func InstallArchive(ctx context.Context, rawURL string, dest string) error { - err := os.MkdirAll("/tmp/chaos", 0755) +func InstallArchive(ctx context.Context, node string, rawURL string, dest string) error { + err := ssh.Exec(ctx, node, "mkdir", "-p", "/tmp/chaos") if err != nil { return err } - var tmpDir string - if tmpDir, err = ioutil.TempDir("/tmp/chaos", "archive_"); err != nil { - return err - } - - defer os.RemoveAll(tmpDir) + tmpDir := fmt.Sprintf("/tmp/chaos/archive_%d", time.Now().UnixNano()) + Mkdir(ctx, node, tmpDir) + defer RemoveDir(ctx, node, tmpDir) var name string if strings.HasPrefix(rawURL, "file://") { - name = strings.Trim(rawURL, "file://") + name = rawURL[len("file://"):] } else { - if name, err = Wget(ctx, rawURL, "/tmp/chaos"); err != nil { + if name, err = Wget(ctx, node, rawURL, "/tmp/chaos"); err != nil { return err } } if strings.HasSuffix(name, ".zip") { - err = exec.CommandContext(ctx, "unzip", "-d", tmpDir, name).Run() + err = ssh.Exec(ctx, node, "unzip", "-d", tmpDir, name) } else { - err = exec.CommandContext(ctx, "tar", "-xf", name, "-C", tmpDir).Run() + err = ssh.Exec(ctx, node, "tar", "-xf", name, "-C", tmpDir) } if err != nil { @@ -90,23 +84,64 @@ func InstallArchive(ctx context.Context, rawURL string, dest string) error { return err } - os.RemoveAll(dest) - os.MkdirAll(path.Dir(dest), 0755) + RemoveDir(ctx, node, dest) + Mkdir(ctx, node, path.Dir(dest)) - var files []os.FileInfo - if files, err = ioutil.ReadDir(tmpDir); err != nil { + var files []string + if files, err = ReadDir(ctx, node, tmpDir); err != nil { return err - } else if len(files) == 1 && files[0].IsDir() { - return os.Rename(path.Join(tmpDir, files[0].Name()), dest) + } else if len(files) == 1 && IsDir(ctx, node, path.Join(tmpDir, files[0])) { + return ssh.Exec(ctx, node, "mv", path.Join(tmpDir, files[0]), dest) + } + + return ssh.Exec(ctx, node, "mv", tmpDir, dest) +} + +// ReadDir runs on node and lists the files of dir. +func ReadDir(ctx context.Context, node string, dir string) ([]string, error) { + output, err := ssh.CombinedOutput(ctx, node, "ls", dir) + if err != nil { + return nil, err + } + + seps := strings.Split(string(output), "\n") + v := make([]string, 0, len(seps)) + for _, sep := range seps { + sep = strings.TrimSpace(sep) + if len(sep) > 0 { + v = append(v, sep) + } } - return os.Rename(tmpDir, dest) + return v, nil +} + +// IsDir runs on node and checks path is directory or not. +func IsDir(ctx context.Context, node string, path string) bool { + err := ssh.Exec(ctx, node, "test", "-d", path) + return err == nil +} + +// Mkdir runs on node and makes a directory +func Mkdir(ctx context.Context, node string, dir string) error { + return ssh.Exec(ctx, node, "mkdir", "-p", dir) +} + +// RemoveDir runs on node and removes the diretory +func RemoveDir(ctx context.Context, node string, dir string) error { + return ssh.Exec(ctx, node, "rm", "-rf", dir) +} + +// WriteFile runs on node and writes data to file +func WriteFile(ctx context.Context, node string, file string, data string) error { + return ssh.Exec(ctx, node, "echo", "-e", data, ">", file) } // DaemonOptions is the options to start a command in daemon mode. type DaemonOptions struct { ChDir string PidFile string + NoClose bool } // NewDaemonOptions returns a default daemon options. @@ -114,14 +149,18 @@ func NewDaemonOptions(chDir string, pidFile string) DaemonOptions { return DaemonOptions{ ChDir: chDir, PidFile: pidFile, + NoClose: false, } } -// StartDaemon starts a daemon process with options -func StartDaemon(ctx context.Context, opts DaemonOptions, cmd string, cmdArgs ...string) error { +// StartDaemon runs on node and starts a daemon process with options +func StartDaemon(ctx context.Context, node string, opts DaemonOptions, cmd string, cmdArgs ...string) error { var args []string args = append(args, "--start") - args = append(args, "--background", "--no-close") + args = append(args, "--background") + if opts.NoClose { + args = append(args, "--no-close") + } args = append(args, "--make-pidfile") processName := path.Base(cmd) @@ -133,13 +172,11 @@ func StartDaemon(ctx context.Context, opts DaemonOptions, cmd string, cmdArgs .. args = append(args, "--") args = append(args, cmdArgs...) - c := exec.CommandContext(ctx, "start-stop-daemon", args...) - - return c.Run() + return ssh.Exec(ctx, node, "start-stop-daemon", args...) } -func parsePID(pidFile string) string { - data, err := ioutil.ReadFile(pidFile) +func parsePID(ctx context.Context, node string, pidFile string) string { + data, err := ssh.CombinedOutput(ctx, node, "cat", pidFile) if err != nil { return "" } @@ -147,28 +184,28 @@ func parsePID(pidFile string) string { return strings.TrimSpace(string(data)) } -// StopDaemon stops the daemon process. -func StopDaemon(ctx context.Context, cmd string, pidFile string) error { - return stopDaemon(ctx, cmd, pidFile, "TERM") +// StopDaemon runs on node and stops the daemon process. +func StopDaemon(ctx context.Context, node string, cmd string, pidFile string) error { + return stopDaemon(ctx, node, cmd, pidFile, "TERM") } -// KillDaemon kills the daemon process. -func KillDaemon(ctx context.Context, cmd string, pidFile string) error { - return stopDaemon(ctx, cmd, pidFile, "KILL") +// KillDaemon runs on node and kills the daemon process. +func KillDaemon(ctx context.Context, node string, cmd string, pidFile string) error { + return stopDaemon(ctx, node, cmd, pidFile, "KILL") } -func stopDaemon(ctx context.Context, cmd string, pidFile string, sig string) error { +func stopDaemon(ctx context.Context, node string, cmd string, pidFile string, sig string) error { name := path.Base(cmd) - return exec.CommandContext(ctx, "start-stop-daemon", "--stop", "--remove-pidfile", - "--pidfile", pidFile, "--oknodo", "--name", name, "--signal", sig).Run() + return ssh.Exec(ctx, node, "start-stop-daemon", "--stop", "--remove-pidfile", + "--pidfile", pidFile, "--oknodo", "--name", name, "--signal", sig) } -// IsDaemonRunning returns whether the daemon is still running or not. -func IsDaemonRunning(ctx context.Context, cmd string, pidFile string) bool { +// IsDaemonRunning runs on node and returns whether the daemon is still running or not. +func IsDaemonRunning(ctx context.Context, node string, cmd string, pidFile string) bool { name := path.Base(cmd) - err := exec.CommandContext(ctx, "start-stop-daemon", "--status", "--pidfile", pidFile, "--name", name).Run() + err := ssh.Exec(ctx, node, "start-stop-daemon", "--status", "--pidfile", pidFile, "--name", name) return err == nil } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 5ef0b3e..bfa199b 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -2,13 +2,14 @@ package util import ( "context" - "io/ioutil" - "os" - "os/exec" + "fmt" "path" "strconv" + "strings" "testing" "time" + + "github.com/siddontang/chaos/pkg/util/ssh" ) func TestWget(t *testing.T) { @@ -17,51 +18,50 @@ func TestWget(t *testing.T) { err error ) for i := 0; i < 2; i++ { - name, err = Wget(context.Background(), "https://raw.githubusercontent.com/pingcap/tikv/master/Cargo.toml", ".") + name, err = Wget(context.Background(), "n1", "https://raw.githubusercontent.com/pingcap/tikv/master/Cargo.toml", ".") if err != nil { t.Fatalf("download failed %v", err) } - if !IsFileExist(name) { + if !IsFileExist(context.Background(), "n1", name) { t.Fatalf("stat %s failed %v", name, err) } } - os.Remove(name) + RemoveDir(context.Background(), "n1", name) } func TestInstallArchive(t *testing.T) { - tmpDir, _ := ioutil.TempDir(".", "var") - defer os.RemoveAll(tmpDir) + tmpDir := fmt.Sprintf("/tmp/chaos/test_%d", time.Now().UnixNano()) + t.Logf("install on %s", tmpDir) + + Mkdir(context.Background(), "n1", tmpDir) + defer RemoveDir(context.Background(), "n1", tmpDir) - err := InstallArchive(context.Background(), "https://github.com/siddontang/chaos/archive/master.zip", path.Join(tmpDir, "1")) + err := InstallArchive(context.Background(), "n1", "https://github.com/siddontang/chaos/archive/master.zip", path.Join(tmpDir, "1")) if err != nil { t.Fatalf("install archive failed %v", err) } - err = InstallArchive(context.Background(), "https://github.com/siddontang/chaos/archive/master.tar.gz", path.Join(tmpDir, "2")) + err = InstallArchive(context.Background(), "n1", "https://github.com/siddontang/chaos/archive/master.tar.gz", path.Join(tmpDir, "2")) if err != nil { t.Fatalf("install archive failed %v", err) } archFile := path.Join(tmpDir, "a.tar.gz") - testCreateArichive(t, path.Join(tmpDir, "test"), archFile) - err = InstallArchive(context.Background(), "file://"+archFile, path.Join(tmpDir, "3")) + testCreateArchive(context.Background(), t, path.Join(tmpDir, "test"), archFile) + err = InstallArchive(context.Background(), "n1", "file://"+archFile, path.Join(tmpDir, "3")) if err != nil { t.Fatalf("install archive failed %v", err) } } -func testCreateArichive(t *testing.T, srcDir string, name string) { - os.MkdirAll(srcDir, 0755) - f, err := os.Create(path.Join(srcDir, "a.log")) - if err != nil { - t.Fatalf("create file failed %v", err) - } - f.WriteString("hello world") - f.Close() +func testCreateArchive(ctx context.Context, t *testing.T, srcDir string, name string) { + t.Logf("crate archieve %s from %s", name, srcDir) + Mkdir(ctx, "n1", srcDir) + WriteFile(ctx, "n1", path.Join(srcDir, "a.log"), "\"hello world\"") - if err = exec.Command("tar", "-cf", name, "-C", srcDir, ".").Run(); err != nil { + if err := ssh.Exec(ctx, "n1", "tar", "-cf", name, "-C", srcDir, "."); err != nil { t.Fatalf("tar %s to %s failed %v", srcDir, name, err) } } @@ -69,47 +69,65 @@ func testCreateArichive(t *testing.T, srcDir string, name string) { func TestDaemon(t *testing.T) { t.Log("test may only be run in the chaos docker") - tmpDir, _ := ioutil.TempDir(".", "var") - defer os.RemoveAll(tmpDir) + tmpDir := fmt.Sprintf("/tmp/chaos/var_%d", time.Now().UnixNano()) + Mkdir(context.Background(), "n1", tmpDir) + defer RemoveDir(context.Background(), "n1", tmpDir) cmd := "/bin/sleep" pidFile := path.Join(tmpDir, "sleep.pid") opts := NewDaemonOptions(tmpDir, pidFile) - err := StartDaemon(context.Background(), opts, cmd, "100") + err := StartDaemon(context.Background(), "n1", opts, cmd, "100") if err != nil { t.Fatalf("start daemon failed %v", err) } - pidStr := parsePID(pidFile) + pidStr := parsePID(context.Background(), "n1", pidFile) if pidStr == "" { t.Fatal("must have a pid file") } pid, _ := strconv.Atoi(pidStr) - if !IsProcessExist(pid) { + if !IsProcessExist(context.Background(), "n1", pid) { t.Fatalf("pid %d must exist", pid) } - if !IsDaemonRunning(context.Background(), cmd, pidFile) { + if !IsDaemonRunning(context.Background(), "n1", cmd, pidFile) { t.Fatal("daemon must be running") } - err = StopDaemon(context.Background(), cmd, pidFile) + err = StopDaemon(context.Background(), "n1", cmd, pidFile) if err != nil { t.Fatalf("stop daemon failed %v", err) } time.Sleep(time.Second) - if IsProcessExist(pid) { + if IsProcessExist(context.Background(), "n1", pid) { t.Fatalf("pid %d must not exist", pid) } - if IsFileExist(pidFile) { + if IsFileExist(context.Background(), "n1", pidFile) { t.Fatalf("pid file must not exist") } - if IsDaemonRunning(context.Background(), cmd, pidFile) { + if IsDaemonRunning(context.Background(), "n1", cmd, pidFile) { t.Fatal("daemon must be not running") } } + +func TestWriteFile(t *testing.T) { + name := "/tmp/chaos/test.log" + if err := WriteFile(context.Background(), "n1", name, "\"[section]\na=b\""); err != nil { + t.Fatalf("write file failed %v", err) + } + + data, err := ssh.CombinedOutput(context.Background(), "n1", "cat", name) + if err != nil { + t.Fatalf("read file failed %v", err) + } + + seps := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(seps) != 2 { + t.Fatalf("invalid read data %s", data) + } +} diff --git a/scripts/deploy_agent.sh b/scripts/deploy_agent.sh deleted file mode 100755 index a414820..0000000 --- a/scripts/deploy_agent.sh +++ /dev/null @@ -1,4 +0,0 @@ -for i in {1..5} -do - scp bin/chaos-agent n$i:/root/chaos-agent -done diff --git a/scripts/start_agent.sh b/scripts/start_agent.sh deleted file mode 100755 index 461162f..0000000 --- a/scripts/start_agent.sh +++ /dev/null @@ -1,6 +0,0 @@ -for i in {1..5} -do - ssh n$i start-stop-daemon --start --background --make-pidfile --pidfile /root/agent.pid \ - --chdir /root --oknodo --startas /root/chaos-agent --name chaos-agent \ - -- --addr :8080 --log-file /root/agent.log -done \ No newline at end of file diff --git a/scripts/stop_agent.sh b/scripts/stop_agent.sh deleted file mode 100755 index f29ec27..0000000 --- a/scripts/stop_agent.sh +++ /dev/null @@ -1,4 +0,0 @@ -for i in {1..5} -do - ssh n$i start-stop-daemon --stop --name chaos-agent --pidfile /root/agent.pid --oknodo --remove-pidfile -done \ No newline at end of file