diff --git a/cmd/rawkv/main.go b/cmd/rawkv/main.go index 97a053a..4e29ac9 100644 --- a/cmd/rawkv/main.go +++ b/cmd/rawkv/main.go @@ -36,9 +36,11 @@ func main() { } var creator core.ClientCreator + var gen control.Generator switch *clientCase { case "register": creator = rawkv.RegisterClientCreator{} + gen = rawkv.RegisterGenRequest default: log.Fatalf("invalid client test case %s", *clientCase) } @@ -51,6 +53,7 @@ func main() { suit := util.Suit{ Config: &cfg, ClientCreator: creator, + Generator: gen, Nemesises: *nemesises, VerifySuit: verifySuit, } diff --git a/cmd/tidb/main.go b/cmd/tidb/main.go index 7f29ed4..c3063d4 100644 --- a/cmd/tidb/main.go +++ b/cmd/tidb/main.go @@ -36,11 +36,14 @@ func main() { } var creator core.ClientCreator + var gen control.Generator switch *clientCase { case "bank": creator = tidb.BankClientCreator{} + gen = tidb.BankGenRequest case "multi_bank": creator = tidb.MultiBankClientCreator{} + gen = tidb.BankGenRequest default: log.Fatalf("invalid client test case %s", *clientCase) } @@ -63,6 +66,7 @@ func main() { suit := util.Suit{ Config: &cfg, ClientCreator: creator, + Generator: gen, Nemesises: *nemesises, VerifySuit: verifySuit, } diff --git a/cmd/txnkv/main.go b/cmd/txnkv/main.go index 0faf64b..203ed11 100644 --- a/cmd/txnkv/main.go +++ b/cmd/txnkv/main.go @@ -36,9 +36,11 @@ func main() { } var creator core.ClientCreator + var gen control.Generator switch *clientCase { case "register": creator = txnkv.RegisterClientCreator{} + gen = txnkv.RegisterGenRequest default: log.Fatalf("invalid client test case %s", *clientCase) } @@ -51,6 +53,7 @@ func main() { suit := util.Suit{ Config: &cfg, ClientCreator: creator, + Generator: gen, Nemesises: *nemesises, VerifySuit: verifySuit, } diff --git a/cmd/util/suit.go b/cmd/util/suit.go index b95a674..f2fddbf 100644 --- a/cmd/util/suit.go +++ b/cmd/util/suit.go @@ -19,6 +19,7 @@ import ( type Suit struct { *control.Config core.ClientCreator + control.Generator // nemesis, seperated by comma. Nemesises string @@ -63,6 +64,7 @@ func (suit *Suit) Run(ctx context.Context, nodes []string) { sctx, suit.Config, suit.ClientCreator, + suit.Generator, nemesisGens, suit.VerifySuit, ) diff --git a/db/rawkv/register.go b/db/rawkv/register.go index 7534573..fa63736 100644 --- a/db/rawkv/register.go +++ b/db/rawkv/register.go @@ -13,6 +13,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/chaos/pkg/core" "github.com/pingcap/chaos/pkg/model" + "github.com/pingcap/chaos/pkg/control" ) var ( @@ -76,18 +77,6 @@ func (c *registerClient) Invoke(ctx context.Context, node string, r interface{}) return model.RegisterResponse{} } -func (c *registerClient) NextRequest() interface{} { - r := model.RegisterRequest{ - Op: c.r.Intn(2) == 1, - } - if r.Op == model.RegisterRead { - return r - } - - r.Value = int(c.r.Int63()) - return r -} - // DumpState the database state(also the model's state) func (c *registerClient) DumpState(ctx context.Context) (interface{}, error) { val, err := c.db.Get(register) @@ -117,3 +106,15 @@ type RegisterClientCreator struct { func (RegisterClientCreator) Create(node string) core.Client { return ®isterClient{} } + +func RegisterGenRequest(*control.Config, int64) interface{} { + r := model.RegisterRequest{ + Op: rand.Intn(2) == 1, + } + if r.Op == model.RegisterRead { + return r + } + + r.Value = int(rand.Int63()) + return r +} diff --git a/db/tidb/bank.go b/db/tidb/bank.go index 1fa580e..311183e 100644 --- a/db/tidb/bank.go +++ b/db/tidb/bank.go @@ -13,6 +13,7 @@ import ( "github.com/anishathalye/porcupine" pchecker "github.com/pingcap/chaos/pkg/check/porcupine" "github.com/pingcap/chaos/pkg/core" + "github.com/pingcap/chaos/pkg/control" "github.com/pingcap/chaos/pkg/history" // use mysql @@ -148,25 +149,6 @@ func (c *bankClient) Invoke(ctx context.Context, node string, r interface{}) int return bankResponse{Ok: true, Tso: tso, FromBalance: fromBalance, ToBalance: toBalance} } -func (c *bankClient) NextRequest() interface{} { - r := bankRequest{ - Op: c.r.Int() % 2, - } - if r.Op == 0 { - return r - } - - r.From = c.r.Intn(c.accountNum) - - r.To = c.r.Intn(c.accountNum) - if r.From == r.To { - r.To = (r.To + 1) % c.accountNum - } - - r.Amount = 5 - return r -} - func (c *bankClient) DumpState(ctx context.Context) (interface{}, error) { txn, err := c.db.Begin() @@ -212,6 +194,25 @@ type bankRequest struct { Amount int64 } +func BankGenRequest(*control.Config, int64) interface{} { + r := bankRequest{ + Op: rand.Int() % 2, + } + if r.Op == 0 { + return r + } + + r.From = rand.Intn(accountNum) + + r.To = rand.Intn(accountNum) + if r.From == r.To { + r.To = (r.To + 1) % accountNum + } + + r.Amount = 5 + return r +} + type bankResponse struct { // Transaction start timestamp Tso uint64 diff --git a/db/tidb/multi_bank.go b/db/tidb/multi_bank.go index 9f4c69c..4a7b30f 100644 --- a/db/tidb/multi_bank.go +++ b/db/tidb/multi_bank.go @@ -133,25 +133,6 @@ func (c *multiBankClient) Invoke(ctx context.Context, node string, r interface{} return bankResponse{Ok: true, Tso: tso, FromBalance: fromBalance, ToBalance: toBalance} } -func (c *multiBankClient) NextRequest() interface{} { - r := bankRequest{ - Op: c.r.Int() % 2, - } - if r.Op == 0 { - return r - } - - r.From = c.r.Intn(c.accountNum) - - r.To = c.r.Intn(c.accountNum) - if r.From == r.To { - r.To = (r.To + 1) % c.accountNum - } - - r.Amount = 5 - return r -} - // DumpState the database state(also the model's state) func (c *multiBankClient) DumpState(ctx context.Context) (interface{}, error) { txn, err := c.db.Begin() diff --git a/db/txnkv/register.go b/db/txnkv/register.go index bed2a8e..dc394c6 100644 --- a/db/txnkv/register.go +++ b/db/txnkv/register.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/chaos/pkg/core" "github.com/pingcap/chaos/pkg/model" + "github.com/pingcap/chaos/pkg/control" ) var ( @@ -122,18 +123,6 @@ func (c *registerClient) Invoke(ctx context.Context, node string, r interface{}) return c.invokeWrite(ctx, arg) } -func (c *registerClient) NextRequest() interface{} { - r := model.RegisterRequest{ - Op: c.r.Intn(2) == 1, - } - if r.Op == model.RegisterRead { - return r - } - - r.Value = int(c.r.Int63()) - return r -} - // DumpState the database state(also the model's state) func (c *registerClient) DumpState(ctx context.Context) (interface{}, error) { tx, err := c.db.Begin() @@ -174,3 +163,15 @@ type RegisterClientCreator struct { func (RegisterClientCreator) Create(node string) core.Client { return ®isterClient{} } + +func RegisterGenRequest(*control.Config, int64) interface{} { + r := model.RegisterRequest{ + Op: rand.Intn(2) == 1, + } + if r.Op == model.RegisterRead { + return r + } + + r.Value = int(rand.Int63()) + return r +} diff --git a/pkg/control/control.go b/pkg/control/control.go index 5a95d08..075eb23 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -14,9 +14,6 @@ import ( // register nemesis _ "github.com/pingcap/chaos/pkg/nemesis" - - // register tidb - _ "github.com/pingcap/chaos/db/tidb" ) // Controller controls the whole cluster. It sends request to the database, @@ -25,7 +22,8 @@ import ( type Controller struct { cfg *Config - clients []core.Client + clients []core.Client + clientGenerator Generator nemesisGenerators []core.NemesisGenerator @@ -43,6 +41,7 @@ func NewController( ctx context.Context, cfg *Config, clientCreator core.ClientCreator, + clientGenerator Generator, nemesisGenerators []core.NemesisGenerator, verifySuit verify.Suit, ) *Controller { @@ -59,6 +58,7 @@ func NewController( c := new(Controller) c.cfg = cfg c.ctx, c.cancel = context.WithCancel(ctx) + c.clientGenerator = clientGenerator c.nemesisGenerators = nemesisGenerators c.suit = verifySuit @@ -235,7 +235,7 @@ func (c *Controller) onClientLoop( procID := atomic.AddInt64(&c.proc, 1) for atomic.AddInt64(requestCount, -1) >= 0 { - request := client.NextRequest() + request := c.clientGenerator(c.cfg, procID) if err := recorder.RecordRequest(procID, request); err != nil { log.Fatalf("record request %v failed %v", request, err) diff --git a/pkg/control/generator.go b/pkg/control/generator.go new file mode 100644 index 0000000..94dd408 --- /dev/null +++ b/pkg/control/generator.go @@ -0,0 +1,4 @@ +package control + +// Generator generates a series of operations +type Generator = func(*Config, int64) interface{} diff --git a/pkg/core/client.go b/pkg/core/client.go index f4c2a28..0753703 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -21,8 +21,6 @@ type Client interface { // Invoke invokes a request to the database. // Mostly, the return Response should implement UnknownResponse interface Invoke(ctx context.Context, node string, r interface{}) interface{} - // NextRequest generates a request for latter Invoke. - NextRequest() interface{} // DumpState the database state(also the model's state) DumpState(ctx context.Context) (interface{}, error) } @@ -58,11 +56,6 @@ func (noopClient) Invoke(ctx context.Context, node string, r interface{}) interf return nil } -// NextRequest generates a request for latter Invoke. -func (noopClient) NextRequest() interface{} { - return nil -} - // DumpState the database state(also the model's state) func (noopClient) DumpState(ctx context.Context) (interface{}, error) { return nil, nil diff --git a/pkg/generator/gen.go b/pkg/generator/gen.go new file mode 100644 index 0000000..d78978c --- /dev/null +++ b/pkg/generator/gen.go @@ -0,0 +1,47 @@ +package generator + +import ( + "math/rand" + "time" + + "github.com/pingcap/chaos/pkg/control" +) + +type PairedGenerator struct { + key interface{} + gen control.Generator +} + +// Reserve takes a series of count, generator pairs, and a final default +// generator. +// (reserve 5 write 10 cas read) +// The first 5 threads will call the `write` generator, the next 10 will emit +// CAS operations, and the remaining threads will perform reads. This is +// particularly useful when you want to ensure that two classes of operations +// have a chance to proceed concurrently--for instance, if writes begin +// blocking, you might like reads to proceed concurrently without every thread +// getting tied up in a write. +func Reserve(final control.Generator, gens ...PairedGenerator) control.Generator { + return func(cfg *control.Config, proc int64) interface{} { + thread := (proc - 1) % len(cfg.Nodes) + cnt := 0 + for _, pair := range gens { + n := pair.key.(int) + if thread >= cnt && thread < cnt + n { + return pair.gen(cfg, proc) + } + cnt += n + } + return final(cfg, proc) + } +} + +// Stagger introduces uniform random timing noise with a mean delay of +// dt duration for every operation. Delays range from 0 to 2 * dt." +func Stagger(dt time.Duration, gen control.Generator) control.Generator { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return func(cfg *control.Config, proc int64) interface{} { + time.Sleep(time.Duration(r.Int63n(2 * int64(dt)))) + return gen(cfg, proc) + } +}