From 18cc2159b2fd35b4c9fb9bd1a013b5fb2b52fcd2 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 31 May 2018 09:56:58 +0800 Subject: [PATCH] run install TiDB synchronously (#7) --- db/tidb/bank.go | 1 + db/tidb/db.go | 35 +++++++++++++++++++++++++++++++---- pkg/control/control.go | 6 +++++- pkg/util/ssh/ssh.go | 12 +++++++++++- pkg/util/sync.go | 25 +++++++++++++++++++++++++ pkg/util/sync_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 pkg/util/sync.go create mode 100644 pkg/util/sync_test.go diff --git a/db/tidb/bank.go b/db/tidb/bank.go index ee96e63..9a9b415 100644 --- a/db/tidb/bank.go +++ b/db/tidb/bank.go @@ -45,6 +45,7 @@ func (c *bankClient) SetUp(ctx context.Context, nodes []string, node string) err return nil } + log.Printf("begin to create table accounts on node %s", node) sql := `create table if not exists accounts (id int not null primary key, balance bigint not null)` diff --git a/db/tidb/db.go b/db/tidb/db.go index ab9ec8e..e08d330 100644 --- a/db/tidb/db.go +++ b/db/tidb/db.go @@ -34,7 +34,8 @@ var ( // db is the TiDB database. type db struct { - nodes []string + nodes []string + installBlocker util.BlockRunner } // SetUp initializes the database. @@ -46,15 +47,30 @@ func (db *db) SetUp(ctx context.Context, nodes []string, node string) error { db.nodes = nodes + db.installBlocker.Init(len(nodes)) + log.Printf("install archieve on node %s", node) - if err := util.InstallArchive(ctx, node, archiveURL, deployDir); err != nil { + + var err error + db.installBlocker.Run(func() { + err = util.InstallArchive(ctx, node, archiveURL, deployDir) + }) + if err != nil { return err } util.Mkdir(ctx, node, path.Join(deployDir, "conf")) util.Mkdir(ctx, node, path.Join(deployDir, "log")) - if err := util.WriteFile(ctx, node, pdConfig, strconv.Quote("[replication]\nmax-replicas=5")); err != nil { + pdCfs := []string{ + "tick-interval=\"100ms\"", + "election-interval=\"500ms\"", + "tso-save-interval=\"500ms\"", + "[replication]", + "max-replicas=5", + } + + if err := util.WriteFile(ctx, node, pdConfig, strconv.Quote(strings.Join(pdCfs, "\n"))); err != nil { return err } @@ -161,8 +177,19 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error { return err } + var err error if inSetUp { - time.Sleep(30 * time.Second) + for i := 0; i < 12; i++ { + if err = ssh.Exec(ctx, node, "curl", fmt.Sprintf("http://%s:10080/status", node)); err == nil { + break + } + log.Printf("try to wait tidb run on %s", node) + time.Sleep(10 * time.Second) + } + } + + if err != nil { + return err } if !util.IsDaemonRunning(ctx, node, tidbBinary, tidbPID) { diff --git a/pkg/control/control.go b/pkg/control/control.go index c9f07e0..6e8cd7f 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -107,6 +107,8 @@ func (c *Controller) Run() { c.tearDownDB() c.recorder.Close() + + log.Printf("finish test") } func (c *Controller) syncExec(f func(i int)) { @@ -158,7 +160,7 @@ func (c *Controller) setUpClient() { } func (c *Controller) tearDownClient() { - log.Printf("begin to set up client") + log.Printf("begin to tear down client") c.syncExec(func(i int) { client := c.clients[i] node := c.nodes[i] @@ -173,6 +175,8 @@ func (c *Controller) onClientLoop(i int) { client := c.clients[i] node := c.nodes[i] + log.Printf("begin to run command on node %s", node) + ctx, cancel := context.WithTimeout(c.ctx, c.cfg.RunTime) defer cancel() diff --git a/pkg/util/ssh/ssh.go b/pkg/util/ssh/ssh.go index 071a8d4..8b73f12 100644 --- a/pkg/util/ssh/ssh.go +++ b/pkg/util/ssh/ssh.go @@ -2,11 +2,16 @@ package ssh import ( "context" + "flag" "fmt" "log" "os/exec" ) +var ( + verbose = flag.Bool("ssh-verbose", false, "show the verbose of SSH command") +) + // Exec executes the cmd on the remote node. // Here we assume we can run with `ssh node cmd` directly // TODO: add a SSH config? @@ -23,10 +28,15 @@ func CombinedOutput(ctx context.Context, node string, cmd string, args ...string cmd, } v = append(v, args...) + if *verbose { + log.Printf("run %s %v on node %s", cmd, args, node) + } data, err := exec.CommandContext(ctx, "ssh", v...).CombinedOutput() if err != nil { // For debug - log.Printf("%v %q %v", v, data, err) + if *verbose { + log.Printf("fail to run %v %q %v", v, data, err) + } } return data, err } diff --git a/pkg/util/sync.go b/pkg/util/sync.go new file mode 100644 index 0000000..94b0250 --- /dev/null +++ b/pkg/util/sync.go @@ -0,0 +1,25 @@ +package util + +import "sync" + +// BlockRunner provides a simple way to run tasks, +// block until all the tasks are finished. +type BlockRunner struct { + once sync.Once + wg sync.WaitGroup +} + +// Init initializes how many tasks we want to run synchronously. +func (r *BlockRunner) Init(n int) { + r.once.Do(func() { + r.wg.Add(n) + }) +} + +// Run runs the task in different goroutines and +// block until all the tasks are finished. +func (r *BlockRunner) Run(f func()) { + f() + r.wg.Done() + r.wg.Wait() +} diff --git a/pkg/util/sync_test.go b/pkg/util/sync_test.go new file mode 100644 index 0000000..08f67b3 --- /dev/null +++ b/pkg/util/sync_test.go @@ -0,0 +1,39 @@ +package util + +import ( + "testing" + "time" +) + +func TestBlockRunner(t *testing.T) { + r := &BlockRunner{} + + r.Init(2) + + ch := make(chan int, 2) + for i := 0; i < 2; i++ { + go func(i int) { + // Can only initialize once + r.Init(3) + r.Run(func() { + t.Logf("block run %d", i+1) + time.Sleep(time.Second) + }) + ch <- i + }(i) + } + + select { + case <-ch: + t.Fatal("can't get data at this time") + case <-time.After(100 * time.Millisecond): + } + + for i := 0; i < 2; i++ { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatal("can't wait ") + } + } +}