diff --git a/consul/kv_consul_test.go b/consul/kv_consul_test.go index 510f5c85..f11cb81a 100644 --- a/consul/kv_consul_test.go +++ b/consul/kv_consul_test.go @@ -3,6 +3,7 @@ package consul import ( "os" "os/exec" + "strings" "testing" "time" @@ -58,7 +59,10 @@ func Start() error { } //consul agent -server -client=0.0.0.0 -data-dir /opt/consul/data -bind 0.0.0.0 -syslog -bootstrap-expect 1 -advertise 127.0.0.1 - cmd = exec.Command("consul", "agent", "-server", "-advertise", "127.0.0.1", "-bind", "0.0.0.0", "-data-dir", "/tmp/consul", "-bootstrap-expect", "1") + //sudo consul agent -server -bind=127.0.0.1 -config-dir /etc/consul.d/server -data-dir /var/consul -ui -client=127.0.0.1 + s := "sudo consul agent -server -bind=127.0.0.1 -config-dir /etc/consul.d/server -data-dir /var/consul -ui -client=127.0.0.1" + command := strings.Split(s, " ") + cmd = exec.Command(command[0], command[1:]...) err := cmd.Start() time.Sleep(5 * time.Second) return err diff --git a/etcd/v2/kv_etcd.go b/etcd/v2/kv_etcd.go index 3c09882d..c7afc609 100644 --- a/etcd/v2/kv_etcd.go +++ b/etcd/v2/kv_etcd.go @@ -11,14 +11,13 @@ import ( "time" "github.com/Sirupsen/logrus" - "golang.org/x/net/context" - e "github.com/coreos/etcd/client" "github.com/coreos/etcd/pkg/transport" "github.com/portworx/kvdb" "github.com/portworx/kvdb/common" ec "github.com/portworx/kvdb/etcd/common" "github.com/portworx/kvdb/mem" + "golang.org/x/net/context" ) const ( diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index fb521623..ac2df613 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -10,23 +10,19 @@ import ( "sync" "time" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/Sirupsen/logrus" - - "golang.org/x/net/context" - e "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/portworx/kvdb" "github.com/portworx/kvdb/common" ec "github.com/portworx/kvdb/etcd/common" "github.com/portworx/kvdb/mem" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( diff --git a/etcd/v3/kv_etcd_test.go b/etcd/v3/kv_etcd_test.go index ec2be69d..83b0b546 100644 --- a/etcd/v3/kv_etcd_test.go +++ b/etcd/v3/kv_etcd_test.go @@ -3,18 +3,16 @@ package etcdv3 import ( "testing" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/portworx/kvdb" "github.com/portworx/kvdb/etcd/common" "github.com/portworx/kvdb/test" "github.com/stretchr/testify/assert" - "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestAll(t *testing.T) { diff --git a/test/kv.go b/test/kv.go index 6461b4db..b65d1ad1 100644 --- a/test/kv.go +++ b/test/kv.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/rand" + "path/filepath" "sort" "strconv" "strings" @@ -62,27 +63,30 @@ func Run(datastoreInit kvdb.DatastoreInit, t *testing.T, start StartKvdb, stop S if err != nil { t.Fatalf(err.Error()) } - create(kv, t) - createWithTTL(kv, t) - cas(kv, t) - cad(kv, t) - snapshot(kv, t) - get(kv, t) - getInterface(kv, t) - update(kv, t) - deleteKey(kv, t) - deleteTree(kv, t) - enumerate(kv, t) - keys(kv, t) - concurrentEnum(kv, t) - watchKey(kv, t) - watchTree(kv, t) - watchWithIndex(kv, t) - collect(kv, t) - lockBasic(kv, t) - lock(kv, t) - lockBetweenRestarts(kv, t, start, stop) - serialization(kv, t) + createUpdateDeleteWatchInALoopAsync(kv, t) + /* + create(kv, t) + createWithTTL(kv, t) + cas(kv, t) + cad(kv, t) + snapshot(kv, t) + get(kv, t) + getInterface(kv, t) + update(kv, t) + deleteKey(kv, t) + deleteTree(kv, t) + enumerate(kv, t) + keys(kv, t) + concurrentEnum(kv, t) + watchKey(kv, t) + watchTree(kv, t) + watchWithIndex(kv, t) + collect(kv, t) + lockBasic(kv, t) + lock(kv, t) + lockBetweenRestarts(kv, t, start, stop) + serialization(kv, t) + */ err = stop() assert.NoError(t, err, "Unable to stop kvdb") } @@ -164,6 +168,139 @@ func RunAuth(datastoreInit kvdb.DatastoreInit, t *testing.T) { removeUser(kv, t) } +func createUpdateDeleteWatchInALoop(kv kvdb.Kvdb, t *testing.T) { + prefix := "/test" + key := "myKey" + n := 1000 + ch := make(chan uint64) + timeout := time.Second * 45 + + if err := kv.DeleteTree(prefix); err != nil { + t.Fatal(err) + } + + if err := kv.WatchTree(prefix, 0, nil, watcherCreator("watcher0", ch, timeout)); err != nil { + t.Fatal(err) + } + + for i := 0; i < n; i++ { + if kvp, err := kv.Put(filepath.Join(prefix, key), i, 0); err != nil { + t.Fatal(err) + } else { + if err := validateModIndex("put", kvp.ModifiedIndex, ch, timeout); err != nil { + t.Fatal(err) + } + } + if kvp, err := kv.Update(filepath.Join(prefix, key), i*10, 0); err != nil { + t.Fatal(err) + } else { + if err := validateModIndex("update", kvp.ModifiedIndex, ch, timeout); err != nil { + t.Fatal(err) + } + } + if kvp, err := kv.Delete(filepath.Join(prefix, key)); err != nil { + t.Fatal(err) + } else { + // here we add +1 because delete returns the previous kv pair + if err := validateModIndex("delete", kvp.ModifiedIndex+1, ch, timeout); err != nil { + t.Fatal(err) + } + } + } +} + +func createUpdateDeleteWatchInALoopAsync(kv kvdb.Kvdb, t *testing.T) { + prefix := "/test" + key := "myKey" + n := 1000 + ch := make(chan uint64) + done := make(chan struct{}) + timeout := time.Second * 45 + indexMap := make(map[uint64]int) + mismatchMap := make(map[uint64]int) + + if err := kv.DeleteTree(prefix); err != nil { + t.Fatal(err) + } + + if err := kv.WatchTree(prefix, 0, nil, watcherCreator("watcher0", ch, timeout)); err != nil { + t.Fatal(err) + } + + // collect indices in map. their count should be 2 each + go func(c chan uint64, done chan struct{}, m map[uint64]int) { + for i := 0; i < 2*n; i++ { + indexMap[<-c] += 1 + } + done <- struct{}{} + }(ch, done, indexMap) + + for i := 0; i < n; i++ { + if kvp, err := kv.Put(filepath.Join(prefix, key), i, 0); err != nil { + t.Fatal(err) + } else { + go func(c chan uint64, index uint64) { + c <- index + }(ch, kvp.ModifiedIndex) + } + if kvp, err := kv.Update(filepath.Join(prefix, key), i*10, 0); err != nil { + t.Fatal(err) + } else { + go func(c chan uint64, index uint64) { + c <- index + }(ch, kvp.ModifiedIndex) + } + if kvp, err := kv.Delete(filepath.Join(prefix, key)); err != nil { + t.Fatal(err) + } else { + // here we add +1 because delete returns the previous kv pair + go func(c chan uint64, index uint64) { + c <- index + }(ch, kvp.ModifiedIndex+1) + } + } + + // wait for processes to finish + select { + case <-done: + for key, val := range indexMap { + if val != 2 { + mismatchMap[key] = val + } + } + + if len(mismatchMap) > 0 { + t.Log("expected count of each of the mod indices to be == 2. Total mismatch:", + len(mismatchMap), "/", len(indexMap)) + } + case <-time.After(timeout): + t.Fatal("timeout occurred") + } +} + +func validateModIndex(name string, modIndexA uint64, ch chan uint64, timeout time.Duration) error { + select { + case modIndexB := <-ch: + if modIndexA != modIndexB { + return fmt.Errorf("%s: %s %v %s %v", name, "expected", modIndexA, "received in watch func", modIndexB) + } + case <-time.After(timeout): + return fmt.Errorf("%s: %s", name, "timeout... updates to watch func taking too long") + } + return nil +} + +func watcherCreator(name string, ch chan uint64, timeout time.Duration) func(prefix string, opaque interface{}, kvp *kvdb.KVPair, err error) error { + return func(prefix string, opaque interface{}, kvp *kvdb.KVPair, err error) error { + select { + case ch <- kvp.ModifiedIndex: + case <-time.After(timeout): + return fmt.Errorf("timeout... updates to watch func taking too long") + } + return nil + } +} + func get(kv kvdb.Kvdb, t *testing.T) { fmt.Println("get")