From c05aace6bdfcbb57c2d499670bcfa1131b187cb5 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sun, 15 Jul 2018 11:11:29 +0800 Subject: [PATCH] *: add txnkv (#14) --- Makefile | 5 +- cmd/txnkv/main.go | 48 ++++++++++++++ db/rawkv/register.go | 3 - db/txnkv/db.go | 23 +++++++ db/txnkv/register.go | 152 +++++++++++++++++++++++++++++++++++++++++++ scripts/test.sh | 7 ++ 6 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 cmd/txnkv/main.go create mode 100644 db/txnkv/db.go create mode 100644 db/txnkv/register.go diff --git a/Makefile b/Makefile index e1c5987..edea51f 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ all: build build: chaos verifier -chaos: rawkv tidb +chaos: rawkv tidb txnkv tidb: go build -o bin/chaos-tidb cmd/tidb/main.go @@ -12,5 +12,8 @@ tidb: rawkv: go build -o bin/chaos-rawkv cmd/rawkv/main.go +txnkv: + go build -o bin/chaos-txnkv cmd/txnkv/main.go + verifier: go build -o bin/chaos-verifier cmd/verifier/main.go \ No newline at end of file diff --git a/cmd/txnkv/main.go b/cmd/txnkv/main.go new file mode 100644 index 0000000..a8a86ee --- /dev/null +++ b/cmd/txnkv/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "flag" + "log" + "time" + + "github.com/siddontang/chaos/cmd/util" + "github.com/siddontang/chaos/db/txnkv" + "github.com/siddontang/chaos/pkg/control" + "github.com/siddontang/chaos/pkg/core" +) + +var ( + requestCount = flag.Int("request-count", 500, "client test request count") + runTime = flag.Duration("run-time", 10*time.Minute, "client test run time") + clientCase = flag.String("case", "register", "client test case, like bank,multi_bank") + historyFile = flag.String("history", "./history.log", "history file") + nemesises = flag.String("nemesis", "", "nemesis, seperated by name, like random_kill,all_kill") + verifyNames = flag.String("verifiers", "", "verifier names, seperate by comma, txnkv_register") +) + +func main() { + flag.Parse() + + cfg := control.Config{ + DB: "txnkv", + RequestCount: *requestCount, + RunTime: *runTime, + History: *historyFile, + } + + var creator core.ClientCreator + switch *clientCase { + case "register": + creator = txnkv.RegisterClientCreator{} + default: + log.Fatalf("invalid client test case %s", *clientCase) + } + + suit := util.Suit{ + Config: cfg, + ClientCreator: creator, + VerifyNames: *verifyNames, + Nemesises: *nemesises, + } + suit.Run() +} diff --git a/db/rawkv/register.go b/db/rawkv/register.go index 88043da..8e76904 100644 --- a/db/rawkv/register.go +++ b/db/rawkv/register.go @@ -11,9 +11,6 @@ import ( "github.com/anishathalye/porcupine" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv" - - // use mysql - _ "github.com/go-sql-driver/mysql" "github.com/siddontang/chaos/pkg/core" "github.com/siddontang/chaos/pkg/model" ) diff --git a/db/txnkv/db.go b/db/txnkv/db.go new file mode 100644 index 0000000..a7a73c8 --- /dev/null +++ b/db/txnkv/db.go @@ -0,0 +1,23 @@ +package txnkv + +import ( + "github.com/siddontang/chaos/db/cluster" + "github.com/siddontang/chaos/pkg/core" +) + +// db is the transactional KV database. +type db struct { + cluster.Cluster +} + +// Name returns the unique name for the database +func (db *db) Name() string { + return "txnkv" +} + +func init() { + core.RegisterDB(&db{ + // TxnKV does not use TiDB. + cluster.Cluster{IncludeTidb: false}, + }) +} diff --git a/db/txnkv/register.go b/db/txnkv/register.go new file mode 100644 index 0000000..0fbbbb1 --- /dev/null +++ b/db/txnkv/register.go @@ -0,0 +1,152 @@ +package txnkv + +import ( + "context" + "fmt" + "log" + "math/rand" + "strconv" + "sync" + "time" + + "github.com/anishathalye/porcupine" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" + "github.com/siddontang/chaos/pkg/core" + "github.com/siddontang/chaos/pkg/model" +) + +var ( + register = []byte("acc") + + closeOnce = sync.Once{} +) + +type registerClient struct { + db kv.Storage + r *rand.Rand +} + +func (c *registerClient) SetUp(ctx context.Context, nodes []string, node string) error { + c.r = rand.New(rand.NewSource(time.Now().UnixNano())) + tikv.MaxConnectionCount = 128 + driver := tikv.Driver{} + db, err := driver.Open(fmt.Sprintf("tikv://%s:2379?disableGC=true", node)) + if err != nil { + return err + } + + c.db = db + + // Do SetUp in the first node + if node != nodes[0] { + return nil + } + + log.Printf("begin to initial register on node %s", node) + + tx, err := db.Begin() + if err != nil { + return err + } + + defer tx.Rollback() + + if err = tx.Set(register, []byte("0")); err != nil { + return err + } + + return tx.Commit(ctx) +} + +func (c *registerClient) TearDown(ctx context.Context, nodes []string, node string) error { + var err error + closeOnce.Do(func() { + // It's a workaround for `panic: close of closed channel`. + // `tikv.Driver.Open` will open the same instance if cluster id is + // the same. + // + // See more: https://github.com/pingcap/tidb/blob/ + // 63c4562c27ad43165e6a0d5f890f33f3b1002b3f/store/tikv/kv.go#L95 + err = c.db.Close() + }) + return err +} + +func (c *registerClient) invokeRead(ctx context.Context, r model.RegisterRequest) model.RegisterResponse { + tx, err := c.db.Begin() + if err != nil { + return model.RegisterResponse{Unknown: true} + } + defer tx.Rollback() + + val, err := tx.Get(register) + if err != nil { + return model.RegisterResponse{Unknown: true} + } + + if err = tx.Commit(ctx); err != nil { + return model.RegisterResponse{Unknown: true} + } + + v, err := strconv.ParseInt(string(val), 10, 64) + if err != nil { + return model.RegisterResponse{Unknown: true} + } + return model.RegisterResponse{Value: int(v)} +} + +func (c *registerClient) invokeWrite(ctx context.Context, r model.RegisterRequest) model.RegisterResponse { + tx, err := c.db.Begin() + if err != nil { + return model.RegisterResponse{Unknown: true} + } + defer tx.Rollback() + + val := fmt.Sprintf("%d", r.Value) + if err = tx.Set(register, []byte(val)); err != nil { + return model.RegisterResponse{Unknown: true} + } + + if err = tx.Commit(ctx); err != nil { + return model.RegisterResponse{Unknown: true} + } + return model.RegisterResponse{} +} + +func (c *registerClient) Invoke(ctx context.Context, node string, r interface{}) interface{} { + arg := r.(model.RegisterRequest) + if arg.Op == model.RegisterRead { + return c.invokeRead(ctx, arg) + } + return c.invokeWrite(ctx, arg) +} + +func (c *registerClient) NextRequest() interface{} { + r := model.RegisterRequest{ + Op: c.r.Intn(2) == 1, + } + if r.Op == model.RegisterRead { + return r + } + + r.Value = int(c.r.Int63()) + return r +} + +func newRegisterEvent(v interface{}, id uint) porcupine.Event { + if _, ok := v.(model.RegisterRequest); ok { + return porcupine.Event{Kind: porcupine.CallEvent, Value: v, Id: id} + } + + return porcupine.Event{Kind: porcupine.ReturnEvent, Value: v, Id: id} +} + +// RegisterClientCreator creates a register test client for txnkv. +type RegisterClientCreator struct { +} + +// Create creates a client. +func (RegisterClientCreator) Create(node string) core.Client { + return ®isterClient{} +} diff --git a/scripts/test.sh b/scripts/test.sh index 6c77e9b..1c5cfbe 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -15,6 +15,12 @@ for bin in $@; do nemeses=( random_kill ) verifiers=( register ) ;; + 'txnkv' ) + suit=chaos-txnkv + cases=( register ) + nemeses=( random_kill ) + verifiers=( register ) + ;; '--help' ) HELP=1 ;; @@ -30,6 +36,7 @@ if [ "$HELP" ]; then echo "usage: $0 [OPTION]" echo " tidb Chaos test TiDB" echo " rawkv Chaos test RawKV" + echo " txnkv Chaos test TxnKV" echo " --help Display this message" exit 0 fi