forked from lightninglabs/neutrino
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notifications.go
372 lines (321 loc) · 9.04 KB
/
notifications.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package neutrino
import (
"errors"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/connmgr"
"github.com/lightninglabs/neutrino/query"
)
type getConnCountMsg struct {
reply chan int32
}
type subConnPeersReply struct {
peerChan chan query.Peer
cancelChan chan struct{}
}
type subConnPeersMsg struct {
reply chan subConnPeersReply
}
type getPeersMsg struct {
reply chan []*ServerPeer
}
type getOutboundGroup struct {
key string
reply chan int
}
type getAddedNodesMsg struct {
reply chan []*ServerPeer
}
type disconnectNodeMsg struct {
cmp func(*ServerPeer) bool
reply chan error
}
type connectNodeMsg struct {
addr string
permanent bool
reply chan error
}
type removeNodeMsg struct {
cmp func(*ServerPeer) bool
reply chan error
}
type forAllPeersMsg struct {
closure func(*ServerPeer)
}
// TODO: General - abstract out more of blockmanager into queries. It'll make
// this way more maintainable and usable.
// handleQuery is the central handler for all queries and commands from other
// goroutines related to peer state.
func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) {
switch msg := querymsg.(type) {
case getConnCountMsg:
nconnected := int32(0)
state.forAllPeers(func(sp *ServerPeer) {
if sp.Connected() {
nconnected++
}
})
msg.reply <- nconnected
case getPeersMsg:
peers := make([]*ServerPeer, 0, state.Count())
state.forAllPeers(func(sp *ServerPeer) {
if !sp.Connected() {
return
}
peers = append(peers, sp)
})
msg.reply <- peers
// Subscription for connected peers requested.
case subConnPeersMsg:
// Create a channel and fill it with the current set of
// connected peers.
cancelChan := make(chan struct{})
peerChan := make(chan query.Peer, state.Count())
state.forAllPeers(func(sp *ServerPeer) {
if !sp.Connected() {
return
}
peerChan <- sp
})
s.peerSubscribers = append(s.peerSubscribers, &peerSubscription{
peers: peerChan,
cancel: cancelChan,
})
msg.reply <- subConnPeersReply{
peerChan: peerChan,
cancelChan: cancelChan,
}
case connectNodeMsg:
// TODO: duplicate oneshots?
// Limit max number of total peers.
if state.Count() >= MaxPeers {
msg.reply <- errors.New("max peers reached")
return
}
for _, peer := range state.persistentPeers {
if peer.Addr() == msg.addr {
if msg.permanent {
msg.reply <- errors.New("peer already connected")
} else {
msg.reply <- errors.New("peer exists as a permanent peer")
}
return
}
}
netAddr, err := s.addrStringToNetAddr(msg.addr)
if err != nil {
msg.reply <- err
return
}
// TODO: if too many, nuke a non-perm peer.
go s.connManager.Connect(&connmgr.ConnReq{
Addr: netAddr,
Permanent: msg.permanent,
})
msg.reply <- nil
case removeNodeMsg:
found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *ServerPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
msg.reply <- nil
} else {
msg.reply <- errors.New("peer not found")
}
case getOutboundGroup:
count, ok := state.outboundGroups[msg.key]
if ok {
msg.reply <- count
} else {
msg.reply <- 0
}
// Request a list of the persistent (added) peers.
case getAddedNodesMsg:
// Respond with a slice of the relevant peers.
peers := make([]*ServerPeer, 0, len(state.persistentPeers))
for _, sp := range state.persistentPeers {
peers = append(peers, sp)
}
msg.reply <- peers
case disconnectNodeMsg:
// Check outbound peers.
found := disconnectPeer(state.outboundPeers, msg.cmp, func(sp *ServerPeer) {
// Keep group counts ok since we remove from
// the list now.
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
if found {
// If there are multiple outbound connections to the same
// ip:port, continue disconnecting them all until no such
// peers are found.
for found {
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *ServerPeer) {
state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
})
}
msg.reply <- nil
return
}
msg.reply <- errors.New("peer not found")
case forAllPeersMsg:
// TODO: Remove this when it's unnecessary due to wider use of
// queryPeers.
// Run the closure on all peers in the passed state.
state.forAllPeers(msg.closure)
// Even though this is a query, there's no reply channel as the
// forAllPeers method doesn't return anything. An error might be
// useful in the future.
}
}
// ConnectedCount returns the number of currently connected peers.
func (s *ChainService) ConnectedCount() int32 {
replyChan := make(chan int32)
select {
case s.query <- getConnCountMsg{reply: replyChan}:
return <-replyChan
case <-s.quit:
return 0
}
}
// ConnectedPeers is a function that returns a channel where all connected
// peers will be sent. It is assumed that all current peers will be sent
// imemdiately, and new peers as they connect.
func (s *ChainService) ConnectedPeers() (<-chan query.Peer, func(), error) {
replyChan := make(chan subConnPeersReply, 1)
select {
case s.query <- subConnPeersMsg{
reply: replyChan,
}:
case <-s.quit:
return nil, nil, ErrShuttingDown
}
select {
case reply := <-replyChan:
return reply.peerChan, func() {
close(reply.cancelChan)
}, nil
case <-s.quit:
return nil, nil, ErrShuttingDown
}
}
// OutboundGroupCount returns the number of peers connected to the given
// outbound group key.
func (s *ChainService) OutboundGroupCount(key string) int {
replyChan := make(chan int)
select {
case s.query <- getOutboundGroup{key: key, reply: replyChan}:
return <-replyChan
case <-s.quit:
return 0
}
}
// AddedNodeInfo returns an array of btcjson.GetAddedNodeInfoResult structures
// describing the persistent (added) nodes.
func (s *ChainService) AddedNodeInfo() []*ServerPeer {
replyChan := make(chan []*ServerPeer)
select {
case s.query <- getAddedNodesMsg{reply: replyChan}:
return <-replyChan
case <-s.quit:
return nil
}
}
// Peers returns an array of all connected peers.
func (s *ChainService) Peers() []*ServerPeer {
replyChan := make(chan []*ServerPeer)
select {
case s.query <- getPeersMsg{reply: replyChan}:
return <-replyChan
case <-s.quit:
return nil
}
}
// DisconnectNodeByAddr disconnects a peer by target address. Both outbound and
// inbound nodes will be searched for the target node. An error message will
// be returned if the peer was not found.
func (s *ChainService) DisconnectNodeByAddr(addr string) error {
replyChan := make(chan error)
select {
case s.query <- disconnectNodeMsg{
cmp: func(sp *ServerPeer) bool { return sp.Addr() == addr },
reply: replyChan,
}:
return <-replyChan
case <-s.quit:
return nil
}
}
// DisconnectNodeByID disconnects a peer by target node id. Both outbound and
// inbound nodes will be searched for the target node. An error message will be
// returned if the peer was not found.
func (s *ChainService) DisconnectNodeByID(id int32) error {
replyChan := make(chan error)
select {
case s.query <- disconnectNodeMsg{
cmp: func(sp *ServerPeer) bool { return sp.ID() == id },
reply: replyChan,
}:
return <-replyChan
case <-s.quit:
return nil
}
}
// RemoveNodeByAddr removes a peer from the list of persistent peers if
// present. An error will be returned if the peer was not found.
func (s *ChainService) RemoveNodeByAddr(addr string) error {
replyChan := make(chan error)
select {
case s.query <- removeNodeMsg{
cmp: func(sp *ServerPeer) bool { return sp.Addr() == addr },
reply: replyChan,
}:
return <-replyChan
case <-s.quit:
return nil
}
}
// RemoveNodeByID removes a peer by node ID from the list of persistent peers
// if present. An error will be returned if the peer was not found.
func (s *ChainService) RemoveNodeByID(id int32) error {
replyChan := make(chan error)
select {
case s.query <- removeNodeMsg{
cmp: func(sp *ServerPeer) bool { return sp.ID() == id },
reply: replyChan,
}:
return <-replyChan
case <-s.quit:
return nil
}
}
// ConnectNode adds `addr' as a new outbound peer. If permanent is true then the
// peer will be persistent and reconnect if the connection is lost.
// It is an error to call this with an already existing peer.
func (s *ChainService) ConnectNode(addr string, permanent bool) error {
replyChan := make(chan error)
select {
case s.query <- connectNodeMsg{
addr: addr,
permanent: permanent,
reply: replyChan,
}:
return <-replyChan
case <-s.quit:
return nil
}
}
// ForAllPeers runs a closure over all peers (outbound and persistent) to which
// the ChainService is connected. Nothing is returned because the peerState's
// ForAllPeers method doesn't return anything as the closure passed to it
// doesn't return anything.
func (s *ChainService) ForAllPeers(closure func(sp *ServerPeer)) {
select {
case s.query <- forAllPeersMsg{closure: closure}:
case <-s.quit:
}
}