Skip to content

Commit

Permalink
SearchValue tests
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 1, 2018
1 parent 64d025d commit 2822c1c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 27 deletions.
49 changes: 28 additions & 21 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,17 @@ type PubsubValueStore struct {
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
func NewPubsubValueStore(ctx context.Context, host p2phost.Host, cr routing.ContentRouting, ps *floodsub.PubSub, validator record.Validator) *PubsubValueStore {
return &PubsubValueStore{
ctx: ctx,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
ps: ps,
ctx: ctx,

ds: dssync.MutexWrap(ds.NewMapDatastore()),
host: host, // needed for pubsub bootstrap
cr: cr, // needed for pubsub bootstrap
ps: ps,

subs: make(map[string]*floodsub.Subscription),
watching: make(map[string]*watchGroup),

Validator: validator,
subs: make(map[string]*floodsub.Subscription),
}
}

Expand Down Expand Up @@ -186,6 +190,7 @@ func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...
closing: make(chan struct{}),
listeners: map[int64]chan<- []byte{},
}
p.watching[key] = wg
}
// Lock searchgroup before checking local storage so we don't miss updates
wg.lk.Lock()
Expand All @@ -199,7 +204,7 @@ func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...

p.watchLk.Lock()
wg.add(ctx, out)
p.watchLk.Lock()
p.watchLk.Unlock()
wg.lk.Unlock()

return out, nil
Expand Down Expand Up @@ -349,22 +354,24 @@ func (wg *watchGroup) add(ctx context.Context, outCh chan []byte) {
wg.lk.Unlock()
}()

select {
case val, ok := <-proxy:
if !ok {
return
}

// outCh is buffered, so we just put the value or swap it for the newer one
for {
select {
case outCh <- val:
case <-outCh:
outCh <- val
case val, ok := <-proxy:
if !ok {
return
}

// outCh is buffered, so we just put the value or swap it for the newer one
select {
case outCh <- val:
case <-outCh:
outCh <- val
}
case <-wg.closing:
return
case <-ctx.Done():
return
}
case <-wg.closing:
return
case <-ctx.Done():
return
}
}()
}
57 changes: 51 additions & 6 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,8 @@ func (testValidator) Select(key string, vals [][]byte) (int, error) {
return idx, nil
}

// tests
func TestPubsubPublishSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

func setupTest(ctx context.Context, t *testing.T) (*PubsubValueStore, []*PubsubValueStore) {
key := "/namespace/key"
key2 := "/namespace/key2"

hosts := newNetHosts(ctx, t, 5)
vss := make([]*PubsubValueStore, len(hosts))
Expand Down Expand Up @@ -105,6 +100,19 @@ func TestPubsubPublishSubscribe(t *testing.T) {
// let the bootstrap finish
time.Sleep(time.Second * 1)

return pub, vss
}

// tests
func TestPubsubPublishSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pub, vss := setupTest(ctx, t)

key := "/namespace/key"
key2 := "/namespace/key2"

val := []byte("valid for key 1")
err := pub.PutValue(ctx, key, val)
if err != nil {
Expand Down Expand Up @@ -196,6 +204,43 @@ func TestPubsubPublishSubscribe(t *testing.T) {
}
}

func TestWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pub, vss := setupTest(ctx, t)

key := "/namespace/key"

ch, err := vss[1].SearchValue(ctx, key)
if err != nil {
t.Fatal(err)
}

val := []byte("valid for key 1")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}

v := string(<-ch)
if v != "valid for key 1" {
t.Errorf("got unexpected value: %s", v)
}

val = []byte("valid for key 2")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}

v = string(<-ch)
if v != "valid for key 2" {
t.Errorf("got unexpected value: %s", v)
}

}

func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) {
t.Helper()
_, err := vs.GetValue(ctx, key)
Expand Down

0 comments on commit 2822c1c

Please sign in to comment.