-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
220 lines (194 loc) · 4.92 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Package grpcpool provides a pool of grpc clients
package grpcpool
import (
"context"
"errors"
"sync"
"time"
"google.golang.org/grpc"
)
var (
// ErrClosed is the error when the client pool is closed
ErrClosed = errors.New("grpc pool: client pool is closed")
// ErrTimeout is the error when the client pool timed out
ErrTimeout = errors.New("grpc pool: client pool timed out")
// ErrAlreadyClosed is the error when the client conn was already closed
ErrAlreadyClosed = errors.New("grpc pool: the connection was already closed")
// ErrFullPool is the error when the pool is already full
ErrFullPool = errors.New("grpc pool: closing a ClientConn into a full pool")
)
// Factory is a function type creating a grpc client
type Factory func() (*grpc.ClientConn, error)
// Pool is the grpc client pool
type Pool struct {
clients chan ClientConn
factory Factory
idleTimeout time.Duration
mu sync.RWMutex
}
// ClientConn is the wrapper for a grpc client conn
type ClientConn struct {
*grpc.ClientConn
pool *Pool
timeUsed time.Time
unhealthy bool
}
// New creates a new clients pool with the given initial amd maximum capacity,
// and the timeout for the idle clients. Returns an error if the initial
// clients could not be created
func New(factory Factory, init, capacity int, idleTimeout time.Duration) (*Pool, error) {
if capacity <= 0 {
capacity = 1
}
if init < 0 {
init = 0
}
if init > capacity {
init = capacity
}
p := &Pool{
clients: make(chan ClientConn, capacity),
factory: factory,
idleTimeout: idleTimeout,
}
for i := 0; i < init; i++ {
c, err := factory()
if err != nil {
return nil, err
}
p.clients <- ClientConn{
ClientConn: c,
pool: p,
timeUsed: time.Now(),
}
}
// Fill the rest of the pool with empty clients
for i := 0; i < capacity-init; i++ {
p.clients <- ClientConn{
pool: p,
}
}
return p, nil
}
func (p *Pool) getClients() chan ClientConn {
p.mu.RLock()
defer p.mu.RUnlock()
return p.clients
}
// Close empties the pool calling Close on all its clients.
// You can call Close while there are outstanding clients.
// It waits for all clients to be returned (Close).
// The pool channel is then closed, and Get will not be allowed anymore
func (p *Pool) Close() {
p.mu.Lock()
clients := p.clients
p.clients = nil
p.mu.Unlock()
if clients == nil {
return
}
close(clients)
for i := 0; i < p.Capacity(); i++ {
client := <-clients
if client.ClientConn == nil {
continue
}
client.ClientConn.Close()
}
}
// IsClosed returns true if the client pool is closed.
func (p *Pool) IsClosed() bool {
return p == nil || p.getClients() == nil
}
// Get will return the next available client. If capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will wait till the next client becomes available or a timeout.
// A timeout of 0 is an indefinite wait
func (p *Pool) Get(ctx context.Context) (*ClientConn, error) {
clients := p.getClients()
if clients == nil {
return nil, ErrClosed
}
wrapper := ClientConn{
pool: p,
}
select {
case wrapper = <-clients:
// All good
case <-ctx.Done():
return nil, ErrTimeout
}
// If the wrapper is old, close the connection and create a new one. It's
// safe to assume that there isn't any newer client as the client we fetched
// is the first in the channel
idleTimeout := p.idleTimeout
if wrapper.ClientConn != nil && idleTimeout > 0 &&
wrapper.timeUsed.Add(idleTimeout).Before(time.Now()) {
wrapper.ClientConn.Close()
wrapper.ClientConn = nil
} else {
wrapper.timeUsed = time.Now()
}
var err error
if wrapper.ClientConn == nil {
wrapper.ClientConn, err = p.factory()
if err != nil {
// If there was an error, we want to put back a placeholder
// client in the channel
clients <- ClientConn{
pool: p,
}
}
}
return &wrapper, err
}
// Unhealhty marks the client conn as unhealthy, so that the connection
// gets reset when closed
func (c *ClientConn) Unhealhty() {
c.unhealthy = true
}
// Close returns a ClientConn to the pool. It is safe to call multiple time,
// but will return an error after first time
func (c *ClientConn) Close() error {
if c == nil {
return nil
}
if c.ClientConn == nil {
return ErrAlreadyClosed
}
if c.pool.IsClosed() {
return ErrClosed
}
// We're cloning the wrapper so we can set ClientConn to nil in the one
// used by the user
wrapper := ClientConn{
pool: c.pool,
ClientConn: c.ClientConn,
timeUsed: time.Now(),
}
if c.unhealthy {
wrapper.ClientConn = nil
}
select {
case c.pool.clients <- wrapper:
// All good
default:
return ErrFullPool
}
c.ClientConn = nil // Mark as closed
return nil
}
// Capacity returns the capacity
func (p *Pool) Capacity() int {
if p.IsClosed() {
return 0
}
return cap(p.clients)
}
// Available returns the number of currently unused clients
func (p *Pool) Available() int {
if p.IsClosed() {
return 0
}
return len(p.clients)
}