From d2b3a101d992440ecb8b6030dc4076747703520b Mon Sep 17 00:00:00 2001 From: "Ben \"Pig\" Chu" Date: Tue, 4 Jun 2019 15:04:54 +0800 Subject: [PATCH] Add long fork test (#32) --- cmd/tidb/main.go | 6 + db/tidb/long_fork.go | 363 ++++++++++++++++++++++++++++++++++++++ db/tidb/long_fork_test.go | 44 +++++ pkg/util/util.go | 4 +- 4 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 db/tidb/long_fork.go create mode 100644 db/tidb/long_fork_test.go diff --git a/cmd/tidb/main.go b/cmd/tidb/main.go index aff6587..abd8017 100644 --- a/cmd/tidb/main.go +++ b/cmd/tidb/main.go @@ -48,6 +48,8 @@ func main() { creator = tidb.BankClientCreator{} case "multi_bank": creator = tidb.MultiBankClientCreator{} + case "long_fork": + creator = tidb.LongForkClientCreator{} case "sequential": creator = tidb.SequentialClientCreator{} default: @@ -62,6 +64,10 @@ func main() { checker = porcupine.Checker{} case "tidb_bank_tso": checker = tidb.BankTsoChecker() + case "long_fork_checker": + checker = tidb.LongForkChecker() + parser = tidb.LongForkParser() + model = nil case "sequential_checker": checker = tidb.NewSequentialChecker() parser = tidb.NewSequentialParser() diff --git a/db/tidb/long_fork.go b/db/tidb/long_fork.go new file mode 100644 index 0000000..0fa590a --- /dev/null +++ b/db/tidb/long_fork.go @@ -0,0 +1,363 @@ +package tidb + +import ( + "context" + "database/sql" + "encoding/binary" + "encoding/json" + "fmt" + "hash/fnv" + "log" + "math/rand" + "sort" + "sync" + "time" + + "github.com/pingcap/chaos/pkg/core" + "github.com/pingcap/chaos/pkg/history" +) + +const ( + lfRead = "read" + lfWrite = "write" + lfGroupSize = 10 +) + +type lfRequest struct { + Kind string + // length=1 for write + Keys []uint64 +} +type lfResponse struct { + Ok bool + Unknown bool + Keys []uint64 + Values []sql.NullInt64 +} + +var ( + lfState = struct { + mu sync.Mutex + nextKey uint64 + workers map[string]uint64 + }{ + mu: sync.Mutex{}, + nextKey: 0, + workers: make(map[string]uint64), + } +) + +type longForkClient struct { + db *sql.DB + r *rand.Rand + tableCount int + node string +} + +func lfTableNames(tableCount int) []string { + names := make([]string, 0, tableCount) + for i := 0; i < tableCount; i++ { + names = append(names, fmt.Sprintf("txn_lf_%d", i)) + } + return names +} + +func lfKey2Table(tableCount int, key uint64) string { + b := make([]byte, 8) + binary.PutUvarint(b, key) + h := fnv.New32a() + h.Write(b) + hash := int(h.Sum32()) + return fmt.Sprintf("txn_lf_%d", hash%tableCount) +} + +func (c *longForkClient) SetUp(ctx context.Context, nodes []string, node string) error { + c.r = rand.New(rand.NewSource(time.Now().UnixNano())) + db, err := sql.Open("mysql", fmt.Sprintf("root@tcp(%s:4000)/test", node)) + if err != nil { + return err + } + c.db = db + + db.SetMaxIdleConns(1 + c.tableCount) + + // Do SetUp in the first node + if node != nodes[0] { + return nil + } + + log.Printf("begin to create %v tables on node %s", c.tableCount, node) + for _, tableName := range lfTableNames(c.tableCount) { + log.Printf("try to drop table %s", tableName) + if _, err = db.ExecContext(ctx, + fmt.Sprintf("drop table if exists %s", tableName)); err != nil { + return err + } + query := "create table if not exists %s (id int not null primary key,sk int not null,val int)" + if _, err = db.ExecContext(ctx, fmt.Sprintf(query, tableName)); err != nil { + return err + } + log.Printf("created table %s", tableName) + } + + return nil +} + +func (c *longForkClient) TearDown(ctx context.Context, nodes []string, node string) error { + return c.db.Close() +} + +func (c *longForkClient) Invoke(ctx context.Context, node string, r interface{}) interface{} { + arg := r.(lfRequest) + if arg.Kind == lfWrite { + + key := arg.Keys[0] + query := fmt.Sprintf("insert into %s (id, sk, val) values (?, ?, ?) on duplicate key update val = ?", lfKey2Table(c.tableCount, key)) + _, err := c.db.ExecContext(ctx, query, key, key, 1, 1) + if err != nil { + return lfResponse{Ok: false} + } + values := make([]sql.NullInt64, 0) + return lfResponse{Ok: true, Keys: []uint64{key}, Values: values} + + } else if arg.Kind == lfRead { + + txn, err := c.db.Begin() + defer txn.Rollback() + if err != nil { + return lfResponse{Ok: false} + } + + values := make([]sql.NullInt64, len(arg.Keys)) + for i, key := range arg.Keys { + query := fmt.Sprintf("select (val) from %s where id = ?", lfKey2Table(c.tableCount, key)) + err := txn.QueryRowContext(ctx, query, key).Scan(&values[i]) + if err != nil { + if err == sql.ErrNoRows { + values[i] = sql.NullInt64{Valid: false} + } else { + return lfResponse{Ok: false} + } + } + } + + if err = txn.Commit(); err != nil { + return lfResponse{Unknown: true, Keys: arg.Keys, Values: values} + } + return lfResponse{Ok: true, Keys: arg.Keys, Values: values} + + } else { + panic(fmt.Sprintf("unknown req %v", r)) + } +} + +func (c *longForkClient) NextRequest() interface{} { + lfState.mu.Lock() + defer lfState.mu.Unlock() + + key, present := lfState.workers[c.node] + if present { + delete(lfState.workers, c.node) + return lfRequest{Kind: lfRead, Keys: makeKeysInGroup(c.r, lfGroupSize, key)} + } + + if c.r.Int()%2 == 0 { + if size := len(lfState.workers); size > 0 { + others := make([]uint64, size) + idx := 0 + for _, value := range lfState.workers { + others[idx] = value + idx++ + } + key := others[c.r.Intn(size)] + return lfRequest{Kind: lfRead, Keys: makeKeysInGroup(c.r, lfGroupSize, key)} + } + } + + key = lfState.nextKey + lfState.nextKey++ + lfState.workers[c.node] = key + return lfRequest{Kind: lfWrite, Keys: []uint64{key}} +} + +func (c *longForkClient) DumpState(ctx context.Context) (interface{}, error) { + return nil, nil +} + +func makeKeysInGroup(r *rand.Rand, groupSize uint64, key uint64) []uint64 { + lower := key - key%groupSize + base := r.Perm(int(groupSize)) + result := make([]uint64, groupSize) + for i, num := range base { + result[i] = uint64(num) + lower + } + return result +} + +// LongForkClientCreator creates long fork test clients for tidb. +type LongForkClientCreator struct { +} + +// Create creates a new longForkClient. +func (LongForkClientCreator) Create(node string) core.Client { + return &longForkClient{ + tableCount: 7, + node: node, + } +} + +type lfParser struct{} + +func (p lfParser) OnRequest(data json.RawMessage) (interface{}, error) { + r := lfRequest{} + err := json.Unmarshal(data, &r) + return r, err +} + +func (p lfParser) OnResponse(data json.RawMessage) (interface{}, error) { + r := lfResponse{} + err := json.Unmarshal(data, &r) + // I have no idea why we need this + if r.Unknown { + return nil, err + } + return r, err +} + +func (p lfParser) OnNoopResponse() interface{} { + return lfResponse{Unknown: true} +} + +func (p lfParser) OnState(data json.RawMessage) (interface{}, error) { + return nil, nil +} + +// LongForkParser parses a history of long fork test. +func LongForkParser() history.RecordParser { + return lfParser{} +} + +type lfChecker struct{} + +func ensureNoLongForks(ops []core.Operation, groupSize int) (bool, error) { + // why we cannot have something like map,T> in golang? + keyset := make(map[string][]uint64) + groups := make(map[string][][]sql.NullInt64) + for _, op := range ops { + if op.Action != core.ReturnOperation { + continue + } + res := op.Data.(lfResponse) + // you con not get request from the response... + if len(res.Values) == 0 { + // it's a write + continue + } + if !res.Ok || res.Unknown { + continue + } + if len(res.Keys) != groupSize || len(res.Values) != groupSize { + return false, fmt.Errorf("The read respond should have %v keys and %v values, but it has %v keys and %v values", + groupSize, groupSize, len(res.Keys), len(res.Values)) + } + type pair struct { + key uint64 + value sql.NullInt64 + } + //sort key + pairs := make([]pair, groupSize) + for i := 0; i < groupSize; i++ { + pairs[i] = pair{key: res.Keys[i], value: res.Values[i]} + } + sort.Slice(pairs, func(i, j int) bool { return pairs[i].key < pairs[j].key }) + keys := make([]uint64, groupSize) + values := make([]sql.NullInt64, groupSize) + for i := 0; i < groupSize; i++ { + keys[i] = pairs[i].key + values[i] = pairs[i].value + } + str := fmt.Sprintf("%v", keys) + groups[str] = append(groups[str], values) + keyset[str] = keys + } + for str, results := range groups { + keys := keyset[str] + count := len(results) + for p := 0; p < count; p++ { + for q := p + 1; q < count; q++ { + values1 := results[p] + values2 := results[q] + //compare! + var result int + for i := 0; i < groupSize; i++ { + present1 := values1[i].Valid + present2 := values2[i].Valid + if present1 && !present2 { + if result > 0 { + log.Printf("Detected fork in history, read to %v returns %v and %v", keys, values1, values2) + return false, nil + } + result = -1 + } + if !present1 && present2 { + if result < 0 { + log.Printf("Detected fork in history, read to %v returns %v and %v", keys, values1, values2) + return false, nil + } + result = 1 + } + if present1 && present2 { + if values1[i] != values2[i] { + return false, fmt.Errorf("The key %v was write twice since it had two different values %v and %v", + keys[i], values1[i], values2[i]) + } + } + } + } + } + } + return true, nil +} + +func ensureNoMultipleWritesToOneKey(ops []core.Operation) (bool, error) { + keySet := make(map[uint64]bool) + for _, op := range ops { + if op.Action != core.InvokeOperation { + continue + } + req := op.Data.(lfRequest) + if req.Kind != lfWrite { + continue + } + for _, key := range req.Keys { + if _, prs := keySet[key]; prs { + return false, fmt.Errorf("The key %v was written twice", key) + } + keySet[key] = true + } + } + return true, nil +} + +func (lfChecker) Check(_ core.Model, ops []core.Operation) (bool, error) { + if ok, err := ensureNoMultipleWritesToOneKey(ops); err != nil { + return false, err + } else if !ok { + return false, nil + } + if ok, err := ensureNoLongForks(ops, lfGroupSize); err != nil { + return false, err + } else if !ok { + return false, nil + } + return true, nil +} + +func (lfChecker) Name() string { + return "tidb_long_fork_checker" +} + +// LongForkChecker checks the long fork test history. +func LongForkChecker() core.Checker { + return lfChecker{} +} diff --git a/db/tidb/long_fork_test.go b/db/tidb/long_fork_test.go new file mode 100644 index 0000000..a299317 --- /dev/null +++ b/db/tidb/long_fork_test.go @@ -0,0 +1,44 @@ +package tidb + +import ( + "database/sql" + "testing" + + "github.com/pingcap/chaos/pkg/core" +) + +func TestCheckLongFork(t *testing.T) { + good := []core.Operation{ + core.Operation{Action: core.InvokeOperation, Proc: 0, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 1, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 2, Data: lfRequest{}}, + core.Operation{Action: core.ReturnOperation, Proc: 2, Data: lfResponse{Ok: true, Keys: []uint64{2, 1, 0}, Values: []sql.NullInt64{sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 1, Data: lfResponse{Ok: true, Keys: []uint64{0, 1, 2}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 0, Data: lfResponse{Ok: true, Keys: []uint64{1, 2, 0}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.InvokeOperation, Proc: 3, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 4, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 5, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 6, Data: lfRequest{}}, + core.Operation{Action: core.ReturnOperation, Proc: 5, Data: lfResponse{Ok: true, Keys: []uint64{3, 4, 5}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 3, Data: lfResponse{Ok: true, Keys: []uint64{5, 4, 3}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 4, Data: lfResponse{Ok: true, Keys: []uint64{4, 3, 5}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 6, Data: lfResponse{Ok: true, Keys: []uint64{5, 3, 4}, Values: []sql.NullInt64{sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}}}}, + } + bad := []core.Operation{ + core.Operation{Action: core.InvokeOperation, Proc: 0, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 1, Data: lfRequest{}}, + core.Operation{Action: core.InvokeOperation, Proc: 2, Data: lfRequest{}}, + core.Operation{Action: core.ReturnOperation, Proc: 2, Data: lfResponse{Ok: true, Keys: []uint64{2, 1, 0}, Values: []sql.NullInt64{sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: true, Int64: 1}}}}, + // long fork! + core.Operation{Action: core.ReturnOperation, Proc: 1, Data: lfResponse{Ok: true, Keys: []uint64{0, 1, 2}, Values: []sql.NullInt64{sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}, sql.NullInt64{Valid: true, Int64: 1}}}}, + core.Operation{Action: core.ReturnOperation, Proc: 0, Data: lfResponse{Ok: true, Keys: []uint64{1, 2, 0}, Values: []sql.NullInt64{sql.NullInt64{Valid: true, Int64: 1}, sql.NullInt64{Valid: false}, sql.NullInt64{Valid: false}}}}, + } + ok, err := ensureNoLongForks(good, 3) + if !ok || err != nil { + t.Fatalf("good must pass check") + } + ok, err = ensureNoLongForks(bad, 3) + if ok { + t.Fatalf("bad must fail check") + } +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 14764f2..9587408 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -42,7 +42,7 @@ func Wget(ctx context.Context, node string, rawURL string, dest string) (string, Mkdir(ctx, node, dest) err = ssh.Exec(ctx, node, "wget", "--tries", "20", "--waitretry", "60", "--retry-connrefused", "--dns-timeout", "60", "--connect-timeout", "60", - "--read-timeout", "60", "--directory-prefix", dest, rawURL) + "--read-timeout", "60", "--no-clobber", "--no-verbose", "--directory-prefix", dest, rawURL) return filePath, err } @@ -69,6 +69,8 @@ func InstallArchive(ctx context.Context, node string, rawURL string, dest string if strings.HasSuffix(name, ".zip") { err = ssh.Exec(ctx, node, "unzip", "-d", tmpDir, name) + } else if strings.HasSuffix(name, ".tar.gz") { + err = ssh.Exec(ctx, node, "tar", "-xzf", name, "-C", tmpDir) } else { err = ssh.Exec(ctx, node, "tar", "-xf", name, "-C", tmpDir) }