Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SearchValue #7

Merged
merged 7 commits into from
Sep 24, 2018
Merged

Implement SearchValue #7

merged 7 commits into from
Sep 24, 2018

Conversation

magik6k
Copy link
Contributor

@magik6k magik6k commented Aug 31, 2018

Part of libp2p/go-libp2p-routing#25
Replaces #3

@ghost ghost assigned magik6k Aug 31, 2018
@ghost ghost added the status/in-progress In progress label Aug 31, 2018
@magik6k magik6k mentioned this pull request Aug 31, 2018
4 tasks
@Stebalien
Copy link
Member

Tell me when you want a review.

@magik6k magik6k changed the title [WIP] Implement SearchValue Implement SearchValue Sep 1, 2018
@magik6k magik6k requested a review from Stebalien September 1, 2018 15:17
@bigs bigs added the feature label Sep 11, 2018
pubsub.go Outdated
lk sync.RWMutex
closing chan struct{}

listeners map[int64]chan<- []byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably just make this a map[chan<- []byte]struct{}. Then we don't need to give each listener an ID.

pubsub.go Outdated
defer func() {
wg.lk.Lock()
delete(wg.listeners, n)
//TODO: watchgroup GC?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well. Just if len(wg.listeners) == 0 { /* lock and remove watch group */ }.

pubsub.go Outdated
go func() {
defer close(outCh)
defer func() {
wg.lk.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a deadlock here if notifyWatchers is trying to write to our channel (it can write twice in a row so the buffer doesn't help). Unfortunately, the watcher <- data write may also have to select on the context (or at least the context cancellation channel). The map will probably need to be map[chan <-[]byte]context.Context.

pubsub.go Outdated

sg.lk.RLock()
for _, watcher := range sg.listeners {
watcher <- data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also (in addition to the context) needs to select on sg.closing as we're not holding the watchLk. Currently, sg.closing can be closed, making all the listeners exit, and we'll hang here forever.

@Stebalien
Copy link
Member

Wow... this is a lot tricker than I thought it would be.

@magik6k magik6k requested a review from Stebalien September 14, 2018 03:08
pubsub.go Show resolved Hide resolved
pubsub.go Outdated

go func() {
ctx, cancel := context.WithCancel(ctx)
wg.listeners[outCh] = ctx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to happen outside the goroutine (while we're holding the lock).

@magik6k
Copy link
Contributor Author

magik6k commented Sep 16, 2018

I've replaced one of the locks with a channel+managing goroutine, opinions?

@magik6k magik6k requested a review from Stebalien September 16, 2018 15:16
pubsub.go Outdated
closing chan struct{}

register chan watcher
unregister chan []chan<- []byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use a struct for these, even if it's an anonymous one (to help newcomers to this code).

(assuming we need multiple channels)

pubsub.go Outdated
ctx, cancel := context.WithCancel(ctx)
wg.register <- watcher{ctx: ctx, out: proxy}

// we wait for ack from watchgroup manager, essentially transferring our lock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't do this. Implicitly transferring a lock is guaranteed to trip someone up down the road.

At this point, we effectively have one big lock anyways (as far as I can tell), we might as well just use it and drop the watch manager.

pubsub.go Outdated Show resolved Hide resolved
@magik6k magik6k force-pushed the feat/searchvalue branch 5 times, most recently from 291417b to fe784ca Compare September 23, 2018 13:56
@magik6k
Copy link
Contributor Author

magik6k commented Sep 23, 2018

Done

p.watchLk.Lock()
if _, wok := p.watching[name]; wok {
p.watchLk.Unlock()
return false, errors.New("key has active subscriptions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, "cancel" should just cancel the passive subscription (keeping the active ones running, not returning an error). However, we can change that behavior later.

pubsub.go Outdated
proxy := make(chan []byte, 1)

ctx, cancel := context.WithCancel(ctx)
wg.listeners[proxy] = ctx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Now that we're doing the take/replace dance with the proxy channel, we don't really need to store the context anywhere (we also don't need to wrap with a cancel). We only needed it to avoid hanging while writing to the proxy channel.

However, it doesn't hurt to do this so it's up to you.

@magik6k magik6k merged commit 19da183 into master Sep 24, 2018
@ghost ghost removed the status/in-progress In progress label Sep 24, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants