From 012608372c444781f6f402cac35fd32bbe194fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Aug 2018 21:14:08 +0200 Subject: [PATCH] Implement SearchValue --- package.json | 4 +-- pubsub.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 13a395c..61ac1a9 100644 --- a/package.json +++ b/package.json @@ -64,9 +64,9 @@ "version": "4.1.6" }, { - "hash": "QmS4niovD1U6pRjUBXivr1zvvLBqiTKbERjFo994JU7oQS", + "hash": "QmNMiBu1AaojCGVqPp1pibwhHq4zTuvgrHXEi2dH2AgEuo", "name": "go-libp2p-routing", - "version": "2.4.9" + "version": "2.5.0" }, { "author": "Stebalien", diff --git a/pubsub.go b/pubsub.go index e947b29..c82c5c2 100644 --- a/pubsub.go +++ b/pubsub.go @@ -22,6 +22,14 @@ import ( var log = logging.Logger("pubsub-valuestore") +type watchGroup struct { + lk sync.RWMutex + closing chan struct{} + + listeners map[int64]chan<- []byte + n int64 +} + type PubsubValueStore struct { ctx context.Context ds ds.Datastore @@ -36,6 +44,9 @@ type PubsubValueStore struct { mx sync.Mutex subs map[string]*floodsub.Subscription + watchLk sync.RWMutex + watching map[string]*watchGroup + Validator record.Validator } @@ -163,6 +174,37 @@ func (p *PubsubValueStore) GetValue(ctx context.Context, key string, opts ...rop return p.getLocal(key) } +func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { + if err := p.Subscribe(key); err != nil { + return nil, err + } + + p.watchLk.Lock() + wg, ok := p.watching[key] + if !ok { + wg = &watchGroup{ + closing: make(chan struct{}), + listeners: map[int64]chan<- []byte{}, + } + } + // Lock searchgroup before checking local storage so we don't miss updates + wg.lk.Lock() + p.watchLk.Unlock() + + out := make(chan []byte, 1) + lv, err := p.getLocal(key) + if err == nil { + out <- lv + } + + p.watchLk.Lock() + wg.add(ctx, out) + p.watchLk.Lock() + wg.lk.Unlock() + + return out, nil +} + // GetSubscriptions retrieves a list of active topic subscriptions func (p *PubsubValueStore) GetSubscriptions() []string { p.mx.Lock() @@ -188,6 +230,13 @@ func (p *PubsubValueStore) Cancel(name string) bool { delete(p.subs, name) } + p.watchLk.Lock() + if wg, ok := p.watching[name]; ok { + close(wg.closing) + delete(p.watching, name) + } + p.watchLk.Unlock() + return ok } @@ -208,10 +257,26 @@ func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key st if err != nil { log.Warningf("PubsubResolve: error writing update for %s: %s", key, err) } + p.notifyWatchers(key, msg.GetData()) } } } +func (p *PubsubValueStore) notifyWatchers(key string, data []byte) { + p.watchLk.RLock() + sg, ok := p.watching[key] + p.watchLk.RUnlock() + if !ok { + return + } + + sg.lk.RLock() + for _, watcher := range sg.listeners { + watcher <- data + } + sg.lk.RUnlock() +} + // rendezvous with peers in the name topic through provider records // Note: rendezvous/boostrap should really be handled by the pubsub implementation itself! func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) { @@ -268,3 +333,38 @@ func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phos wg.Wait() } + +func (sg *watchGroup) add(ctx context.Context, outCh chan []byte) { + proxy := make(chan []byte, 1) + n := sg.n + sg.listeners[n] = proxy + sg.n++ + + go func() { + defer close(outCh) + defer func() { + sg.lk.Lock() + delete(sg.listeners, n) + //TODO: searchgroup GC? + sg.lk.Unlock() + }() + + select { + case val, ok := <-proxy: + if !ok { + return + } + + // outCh is buffered, so we just put the value or swap it for the newer one + select { + case outCh <- val: + case <-outCh: + outCh <- val + } + case <-sg.closing: + return + case <-ctx.Done(): + return + } + }() +}