-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscription.go
64 lines (56 loc) · 1.6 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package rx
import (
"sync"
)
// Subscription is an interface that allows code to monitor and control a
// subscription it received.
type Subscription interface {
Subscribable
// Wait will by default block the calling goroutine and wait for the
// Unsubscribe method to be called on this subscription.
// However, when OnWait was called with a callback wait function it will
// call that instead. Calling Wait on a subscription that has already been
// canceled will return immediately. If the subscriber was canceled by
// calling Unsubscribe, then the error returned is ErrUnsubscribed.
// If the subscriber was terminated by calling Done, then the error
// returned here is the one passed to Done.
Wait() error
}
type subscription struct {
subscriber
wait func()
err error
}
// SubscriptionCanceled is the error returned by Wait when the Unsubscribe method is
// called on an active subscription.
const SubscriptionCanceled = Error("subscription canceled")
func newSubscription(scheduler Scheduler) *subscription {
s := &subscription{err: SubscriptionCanceled}
if !scheduler.IsConcurrent() {
s.wait = scheduler.Wait
}
return s
}
// Done will set the error internally and then cancel the subscription by
// calling the Unsubscribe method. A nil value for error indicates success.
func (s *subscription) Done(err error) {
s.Lock()
s.err = err
s.Unlock()
s.Unsubscribe()
}
func (s *subscription) Wait() error {
if wait := s.wait; wait != nil {
wait()
}
if s.Subscribed() {
var wg sync.WaitGroup
wg.Add(1)
s.OnUnsubscribe(wg.Done)
wg.Wait()
}
s.Lock()
err := s.err
s.Unlock()
return err
}