Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-4572: added UT to watch multiple updates #31

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
6 changes: 5 additions & 1 deletion consul/kv_consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"os"
"os/exec"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -58,7 +59,10 @@ func Start() error {
}

//consul agent -server -client=0.0.0.0 -data-dir /opt/consul/data -bind 0.0.0.0 -syslog -bootstrap-expect 1 -advertise 127.0.0.1
cmd = exec.Command("consul", "agent", "-server", "-advertise", "127.0.0.1", "-bind", "0.0.0.0", "-data-dir", "/tmp/consul", "-bootstrap-expect", "1")
//sudo consul agent -server -bind=127.0.0.1 -config-dir /etc/consul.d/server -data-dir /var/consul -ui -client=127.0.0.1
s := "sudo consul agent -server -bind=127.0.0.1 -config-dir /etc/consul.d/server -data-dir /var/consul -ui -client=127.0.0.1"
command := strings.Split(s, " ")
cmd = exec.Command(command[0], command[1:]...)
err := cmd.Start()
time.Sleep(5 * time.Second)
return err
Expand Down
3 changes: 1 addition & 2 deletions etcd/v2/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"time"

"github.com/Sirupsen/logrus"
"golang.org/x/net/context"

e "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/portworx/kvdb"
"github.com/portworx/kvdb/common"
ec "github.com/portworx/kvdb/etcd/common"
"github.com/portworx/kvdb/mem"
"golang.org/x/net/context"
)

const (
Expand Down
10 changes: 3 additions & 7 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,19 @@ import (
"sync"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/Sirupsen/logrus"

"golang.org/x/net/context"

e "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"

"github.com/portworx/kvdb"
"github.com/portworx/kvdb/common"
ec "github.com/portworx/kvdb/etcd/common"
"github.com/portworx/kvdb/mem"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down
8 changes: 3 additions & 5 deletions etcd/v3/kv_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ package etcdv3
import (
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/portworx/kvdb"
"github.com/portworx/kvdb/etcd/common"
"github.com/portworx/kvdb/test"
"github.com/stretchr/testify/assert"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestAll(t *testing.T) {
Expand Down
179 changes: 158 additions & 21 deletions test/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"path/filepath"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -62,27 +63,30 @@ func Run(datastoreInit kvdb.DatastoreInit, t *testing.T, start StartKvdb, stop S
if err != nil {
t.Fatalf(err.Error())
}
create(kv, t)
createWithTTL(kv, t)
cas(kv, t)
cad(kv, t)
snapshot(kv, t)
get(kv, t)
getInterface(kv, t)
update(kv, t)
deleteKey(kv, t)
deleteTree(kv, t)
enumerate(kv, t)
keys(kv, t)
concurrentEnum(kv, t)
watchKey(kv, t)
watchTree(kv, t)
watchWithIndex(kv, t)
collect(kv, t)
lockBasic(kv, t)
lock(kv, t)
lockBetweenRestarts(kv, t, start, stop)
serialization(kv, t)
createUpdateDeleteWatchInALoopAsync(kv, t)
/*
create(kv, t)
createWithTTL(kv, t)
cas(kv, t)
cad(kv, t)
snapshot(kv, t)
get(kv, t)
getInterface(kv, t)
update(kv, t)
deleteKey(kv, t)
deleteTree(kv, t)
enumerate(kv, t)
keys(kv, t)
concurrentEnum(kv, t)
watchKey(kv, t)
watchTree(kv, t)
watchWithIndex(kv, t)
collect(kv, t)
lockBasic(kv, t)
lock(kv, t)
lockBetweenRestarts(kv, t, start, stop)
serialization(kv, t)
*/
err = stop()
assert.NoError(t, err, "Unable to stop kvdb")
}
Expand Down Expand Up @@ -164,6 +168,139 @@ func RunAuth(datastoreInit kvdb.DatastoreInit, t *testing.T) {
removeUser(kv, t)
}

func createUpdateDeleteWatchInALoop(kv kvdb.Kvdb, t *testing.T) {
prefix := "/test"
key := "myKey"
n := 1000
ch := make(chan uint64)
timeout := time.Second * 45

if err := kv.DeleteTree(prefix); err != nil {
t.Fatal(err)
}

if err := kv.WatchTree(prefix, 0, nil, watcherCreator("watcher0", ch, timeout)); err != nil {
t.Fatal(err)
}

for i := 0; i < n; i++ {
if kvp, err := kv.Put(filepath.Join(prefix, key), i, 0); err != nil {
t.Fatal(err)
} else {
if err := validateModIndex("put", kvp.ModifiedIndex, ch, timeout); err != nil {
t.Fatal(err)
}
}
if kvp, err := kv.Update(filepath.Join(prefix, key), i*10, 0); err != nil {
t.Fatal(err)
} else {
if err := validateModIndex("update", kvp.ModifiedIndex, ch, timeout); err != nil {
t.Fatal(err)
}
}
if kvp, err := kv.Delete(filepath.Join(prefix, key)); err != nil {
t.Fatal(err)
} else {
// here we add +1 because delete returns the previous kv pair
if err := validateModIndex("delete", kvp.ModifiedIndex+1, ch, timeout); err != nil {
t.Fatal(err)
}
}
}
}

func createUpdateDeleteWatchInALoopAsync(kv kvdb.Kvdb, t *testing.T) {
prefix := "/test"
key := "myKey"
n := 1000
ch := make(chan uint64)
done := make(chan struct{})
timeout := time.Second * 45
indexMap := make(map[uint64]int)
mismatchMap := make(map[uint64]int)

if err := kv.DeleteTree(prefix); err != nil {
t.Fatal(err)
}

if err := kv.WatchTree(prefix, 0, nil, watcherCreator("watcher0", ch, timeout)); err != nil {
t.Fatal(err)
}

// collect indices in map. their count should be 2 each
go func(c chan uint64, done chan struct{}, m map[uint64]int) {
for i := 0; i < 2*n; i++ {
indexMap[<-c] += 1
}
done <- struct{}{}
}(ch, done, indexMap)

for i := 0; i < n; i++ {
if kvp, err := kv.Put(filepath.Join(prefix, key), i, 0); err != nil {
t.Fatal(err)
} else {
go func(c chan uint64, index uint64) {
c <- index
}(ch, kvp.ModifiedIndex)
}
if kvp, err := kv.Update(filepath.Join(prefix, key), i*10, 0); err != nil {
t.Fatal(err)
} else {
go func(c chan uint64, index uint64) {
c <- index
}(ch, kvp.ModifiedIndex)
}
if kvp, err := kv.Delete(filepath.Join(prefix, key)); err != nil {
t.Fatal(err)
} else {
// here we add +1 because delete returns the previous kv pair
go func(c chan uint64, index uint64) {
c <- index
}(ch, kvp.ModifiedIndex+1)
}
}

// wait for processes to finish
select {
case <-done:
for key, val := range indexMap {
if val != 2 {
mismatchMap[key] = val
}
}

if len(mismatchMap) > 0 {
t.Log("expected count of each of the mod indices to be == 2. Total mismatch:",
len(mismatchMap), "/", len(indexMap))
}
case <-time.After(timeout):
t.Fatal("timeout occurred")
}
}

func validateModIndex(name string, modIndexA uint64, ch chan uint64, timeout time.Duration) error {
select {
case modIndexB := <-ch:
if modIndexA != modIndexB {
return fmt.Errorf("%s: %s %v %s %v", name, "expected", modIndexA, "received in watch func", modIndexB)
}
case <-time.After(timeout):
return fmt.Errorf("%s: %s", name, "timeout... updates to watch func taking too long")
}
return nil
}

func watcherCreator(name string, ch chan uint64, timeout time.Duration) func(prefix string, opaque interface{}, kvp *kvdb.KVPair, err error) error {
return func(prefix string, opaque interface{}, kvp *kvdb.KVPair, err error) error {
select {
case ch <- kvp.ModifiedIndex:
case <-time.After(timeout):
return fmt.Errorf("timeout... updates to watch func taking too long")
}
return nil
}
}

func get(kv kvdb.Kvdb, t *testing.T) {
fmt.Println("get")

Expand Down