-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipe.go
127 lines (112 loc) · 2.8 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package eventbus
import (
"sync"
)
type Handler[T any] func(payload T)
// Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers.
// Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`.
// To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel.
// If multiple subscribers exist, each subscriber will receive the message.
type Pipe[T any] struct {
sync.RWMutex
bufferSize int
channel chan T
handlers *CowMap
closed bool
stopCh chan any
}
// NewPipe create a unbuffered pipe
func NewPipe[T any]() *Pipe[T] {
p := &Pipe[T]{
bufferSize: -1,
channel: make(chan T),
stopCh: make(chan any),
handlers: NewCowMap(),
}
go p.loop()
return p
}
// NewPipe create a buffered pipe, bufferSize is the buffer size of the pipe
// When create a buffered pipe. You can publish into the Pipe without a corresponding concurrent subscriber.
func NewBufferedPipe[T any](bufferSize int) *Pipe[T] {
if bufferSize <= 0 {
bufferSize = 1
}
p := &Pipe[T]{
bufferSize: bufferSize,
channel: make(chan T, bufferSize),
stopCh: make(chan any),
handlers: NewCowMap(),
}
go p.loop()
return p
}
// loop loops forever, receiving published message from the pipe, transfer payload to subscriber by calling handlers
func (p *Pipe[T]) loop() {
for {
select {
case payload := <-p.channel:
p.handlers.Range(func(key any, fn any) bool {
fn.(Handler[T])(payload)
return true
})
case <-p.stopCh:
return
}
}
}
// subscribe add a handler to a pipe, return error if the pipe is closed.
func (p *Pipe[T]) Subscribe(handler Handler[T]) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Store(&handler, handler)
return nil
}
// unsubscribe removes handler defined for this pipe.
func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Delete(&handler)
return nil
}
// Publish triggers the handlers defined for this pipe, transferring the payload to the handlers.
func (p *Pipe[T]) Publish(payload T) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.channel <- payload
return nil
}
// PublishSync triggers the handlers defined for this pipe synchronously, without using a channel.
// The payload will be passed directly to the handlers.
func (p *Pipe[T]) PublishSync(payload T) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Range(func(key any, fn any) bool {
fn.(Handler[T])(payload)
return true
})
return nil
}
// close closes the pipe
func (p *Pipe[T]) Close() {
p.Lock()
defer p.Unlock()
if p.closed {
return
}
p.closed = true
p.stopCh <- struct{}{}
close(p.channel)
}