Skip to content

Commit

Permalink
run install TiDB synchronously (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang authored May 31, 2018
1 parent 0b153f4 commit 18cc215
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 6 deletions.
1 change: 1 addition & 0 deletions db/tidb/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (c *bankClient) SetUp(ctx context.Context, nodes []string, node string) err
return nil
}

log.Printf("begin to create table accounts on node %s", node)
sql := `create table if not exists accounts
(id int not null primary key,
balance bigint not null)`
Expand Down
35 changes: 31 additions & 4 deletions db/tidb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var (

// db is the TiDB database.
type db struct {
nodes []string
nodes []string
installBlocker util.BlockRunner
}

// SetUp initializes the database.
Expand All @@ -46,15 +47,30 @@ func (db *db) SetUp(ctx context.Context, nodes []string, node string) error {

db.nodes = nodes

db.installBlocker.Init(len(nodes))

log.Printf("install archieve on node %s", node)
if err := util.InstallArchive(ctx, node, archiveURL, deployDir); err != nil {

var err error
db.installBlocker.Run(func() {
err = util.InstallArchive(ctx, node, archiveURL, deployDir)
})
if err != nil {
return err
}

util.Mkdir(ctx, node, path.Join(deployDir, "conf"))
util.Mkdir(ctx, node, path.Join(deployDir, "log"))

if err := util.WriteFile(ctx, node, pdConfig, strconv.Quote("[replication]\nmax-replicas=5")); err != nil {
pdCfs := []string{
"tick-interval=\"100ms\"",
"election-interval=\"500ms\"",
"tso-save-interval=\"500ms\"",
"[replication]",
"max-replicas=5",
}

if err := util.WriteFile(ctx, node, pdConfig, strconv.Quote(strings.Join(pdCfs, "\n"))); err != nil {
return err
}

Expand Down Expand Up @@ -161,8 +177,19 @@ func (db *db) start(ctx context.Context, node string, inSetUp bool) error {
return err
}

var err error
if inSetUp {
time.Sleep(30 * time.Second)
for i := 0; i < 12; i++ {
if err = ssh.Exec(ctx, node, "curl", fmt.Sprintf("http://%s:10080/status", node)); err == nil {
break
}
log.Printf("try to wait tidb run on %s", node)
time.Sleep(10 * time.Second)
}
}

if err != nil {
return err
}

if !util.IsDaemonRunning(ctx, node, tidbBinary, tidbPID) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (c *Controller) Run() {
c.tearDownDB()

c.recorder.Close()

log.Printf("finish test")
}

func (c *Controller) syncExec(f func(i int)) {
Expand Down Expand Up @@ -158,7 +160,7 @@ func (c *Controller) setUpClient() {
}

func (c *Controller) tearDownClient() {
log.Printf("begin to set up client")
log.Printf("begin to tear down client")
c.syncExec(func(i int) {
client := c.clients[i]
node := c.nodes[i]
Expand All @@ -173,6 +175,8 @@ func (c *Controller) onClientLoop(i int) {
client := c.clients[i]
node := c.nodes[i]

log.Printf("begin to run command on node %s", node)

ctx, cancel := context.WithTimeout(c.ctx, c.cfg.RunTime)
defer cancel()

Expand Down
12 changes: 11 additions & 1 deletion pkg/util/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package ssh

import (
"context"
"flag"
"fmt"
"log"
"os/exec"
)

var (
verbose = flag.Bool("ssh-verbose", false, "show the verbose of SSH command")
)

// Exec executes the cmd on the remote node.
// Here we assume we can run with `ssh node cmd` directly
// TODO: add a SSH config?
Expand All @@ -23,10 +28,15 @@ func CombinedOutput(ctx context.Context, node string, cmd string, args ...string
cmd,
}
v = append(v, args...)
if *verbose {
log.Printf("run %s %v on node %s", cmd, args, node)
}
data, err := exec.CommandContext(ctx, "ssh", v...).CombinedOutput()
if err != nil {
// For debug
log.Printf("%v %q %v", v, data, err)
if *verbose {
log.Printf("fail to run %v %q %v", v, data, err)
}
}
return data, err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package util

import "sync"

// BlockRunner provides a simple way to run tasks,
// block until all the tasks are finished.
type BlockRunner struct {
once sync.Once
wg sync.WaitGroup
}

// Init initializes how many tasks we want to run synchronously.
func (r *BlockRunner) Init(n int) {
r.once.Do(func() {
r.wg.Add(n)
})
}

// Run runs the task in different goroutines and
// block until all the tasks are finished.
func (r *BlockRunner) Run(f func()) {
f()
r.wg.Done()
r.wg.Wait()
}
39 changes: 39 additions & 0 deletions pkg/util/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package util

import (
"testing"
"time"
)

func TestBlockRunner(t *testing.T) {
r := &BlockRunner{}

r.Init(2)

ch := make(chan int, 2)
for i := 0; i < 2; i++ {
go func(i int) {
// Can only initialize once
r.Init(3)
r.Run(func() {
t.Logf("block run %d", i+1)
time.Sleep(time.Second)
})
ch <- i
}(i)
}

select {
case <-ch:
t.Fatal("can't get data at this time")
case <-time.After(100 * time.Millisecond):
}

for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatal("can't wait ")
}
}
}

0 comments on commit 18cc215

Please sign in to comment.