-
Notifications
You must be signed in to change notification settings - Fork 6
/
mailbox.go
145 lines (126 loc) · 2.91 KB
/
mailbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package grid
import (
"errors"
"sync"
)
var (
// errDeregisteredFailed is used internally when we can't deregister a key from etcd.
// It's used by Server.startActor() to ensure we panic.
errDeregisteredFailed = errors.New("grid: deregistered failed")
)
// mailboxRegistry is a collection of named mailboxes.
type mailboxRegistry struct {
mu sync.RWMutex
r map[string]*GRPCMailbox
}
func newMailboxRegistry() *mailboxRegistry {
return &mailboxRegistry{
r: make(map[string]*GRPCMailbox),
}
}
// Get retrieves the mailbox.
func (r *mailboxRegistry) Get(name string) (m *GRPCMailbox, found bool) {
r.mu.RLock()
defer r.mu.RUnlock()
m, found = r.r[name]
return
}
// Set the mailbox.
func (r *mailboxRegistry) Set(name string, m *GRPCMailbox) (update bool) {
r.mu.Lock()
defer r.mu.Unlock()
_, update = r.r[name]
r.r[name] = m
return
}
// Delete the mailbox.
func (r *mailboxRegistry) Delete(name string) (found bool) {
r.mu.Lock()
defer r.mu.Unlock()
_, found = r.r[name]
if found {
delete(r.r, name)
}
return
}
// Size of the registry.
func (r *mailboxRegistry) Size() int {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.r)
}
// R returns a shallow copy of the underlying registry.
func (r *mailboxRegistry) R() map[string]*GRPCMailbox {
r.mu.RLock()
defer r.mu.RUnlock()
out := make(map[string]*GRPCMailbox, len(r.r))
for k, v := range r.r {
out[k] = v
}
return out
}
type Mailbox interface {
C() <-chan Request
Close() error
}
// GRPCMailbox for receiving messages.
type GRPCMailbox struct {
// mu protects c and closed
mu sync.RWMutex
c chan Request
closed bool
name string
nsName string
requests <-chan Request
once sync.Once
cleanup func()
}
func (box *GRPCMailbox) C() <-chan Request {
return box.requests
}
// Close the mailbox.
func (box *GRPCMailbox) Close() error {
box.once.Do(func() {
box.mu.Lock()
close(box.c)
box.closed = true
box.mu.Unlock()
// Run server-provided clean up.
box.cleanup()
})
return nil
}
// Name of mailbox, without namespace.
func (box *GRPCMailbox) Name() string {
return box.name
}
// String of mailbox name, with full namespace.
func (box *GRPCMailbox) String() string {
return box.nsName
}
// put a request into the mailbox if it is not closed,
// otherwise return an error indicating that the
// receiver is busy.
func (box *GRPCMailbox) put(req *request) error {
// NOTE (2022-01) (mh): We have to defer the unlock here
// as it's not safe otherwise.
//
// If we RUnlock() after reading box.closed:
// goroutine 1 (put) : RLock - RUnlock() - send (!!!)
// goroutine 2 (close): Lock - close - Unlock
//
// If we RUnlock() at the end:
// goroutine 1 (put) : RLock - send - RUnlock()
// goroutine 2 (close): Lock - close - Unlock
box.mu.RLock()
defer box.mu.RUnlock()
if box.closed {
return ErrReceiverBusy
}
select {
case box.c <- req:
return nil
default:
return ErrReceiverBusy
}
}