-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomposite.go
91 lines (78 loc) · 2.46 KB
/
composite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package supervisor
import (
"context"
"sync"
"sync/atomic"
)
type compositeControl struct {
ctx context.Context
cancelFunc context.CancelFunc
open uint32
closeWg sync.WaitGroup
waitWg sync.WaitGroup // WG to wait for exit of all components
openError compositeError
closeError compositeError
waitError compositeError // composite Wait() error
}
func (c *compositeControl) isOpen() (ok bool) {
return atomic.LoadUint32(&c.open) == 1
}
func (c *compositeControl) tryOpen() (op bool) {
return atomic.CompareAndSwapUint32(&c.open, 0, 1)
}
type composite struct {
handler func(control *compositeControl)
control *compositeControl
}
func newComposite(ctx context.Context, handler func(control *compositeControl)) (c *composite) {
c = &composite{
handler: handler,
control: &compositeControl{},
}
c.control.ctx, c.control.cancelFunc = context.WithCancel(ctx)
return c
}
// Open blocks until all components are opened. This method should be called
// before Close(). Otherwise Open() will return error. If Open() method of one
// of components returns error all opened components will be closed. This
// method may be called many times and will return equal results. It's
// guaranteed that Open() method of all components will be called only once.
func (c *composite) Open() (err error) {
select {
case <-c.control.ctx.Done(): // closed
if !c.control.isOpen() {
// closed before open
return ErrPrematurelyClosed
}
return c.control.openError.get()
default:
}
if !c.control.tryOpen() {
return c.control.openError.get()
}
c.handler(c.control)
return c.control.openError.get()
}
// Close initialises shutdown for all Components. This method may be called
// many times and will return equal results. It's guaranteed that Close()
// method of all components will be called only once.
func (c *composite) Close() (err error) {
select {
case <-c.control.ctx.Done():
// already closed
return c.control.closeError.get()
default:
c.control.cancelFunc()
c.control.closeWg.Wait()
}
return c.control.closeError.get()
}
// Wait blocks until all components are exited. If one of Wait() method of one
// of Components is exited before Close() all opened components will be closed.
// This method may be called many times and will return equal results. It's
// guaranteed that Wait() method of all components will be called only once.
func (c *composite) Wait() (err error) {
<-c.control.ctx.Done()
c.control.waitWg.Wait()
return c.control.waitError.get()
}